This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf770c2cd [GOBBLIN-2175] Support partial commit with 
Gobblin-on-Temporal execution (#4078)
0bf770c2cd is described below

commit 0bf770c2cde24c98c30ee6c9af18f58a9589f585
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Dec 13 12:13:14 2024 +0530

    [GOBBLIN-2175] Support partial commit with Gobblin-on-Temporal execution 
(#4078)
    
    ---------
    
    Co-authored-by: Aditya Pratap Singh <[email protected]>
---
 .../gobblin/source/extractor/JobCommitPolicy.java  | 12 +++--
 .../ddm/activity/impl/CommitActivityImpl.java      | 22 ++++++--
 .../gobblin/temporal/ddm/work/CommitStats.java     |  6 ++-
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  | 19 +++++--
 .../impl/ProcessWorkUnitsWorkflowImpl.java         | 49 ++++++++++++++----
 .../exception/FailedDatasetUrnsException.java      | 60 ++++++++++++++++++++++
 6 files changed, 143 insertions(+), 25 deletions(-)

diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
index f60acb6030..24a5323d27 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.source.extractor;
 
 import java.util.Properties;
 
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 
@@ -33,7 +35,7 @@ public enum JobCommitPolicy {
   /**
    * Commit output data of a job if and only if all of its tasks successfully 
complete.
    */
-  COMMIT_ON_FULL_SUCCESS("full"),
+  COMMIT_ON_FULL_SUCCESS("full", false),
 
   /**
    * Commit a job even if some of its tasks fail. It's up to the {@link 
org.apache.gobblin.publisher.DataPublisher} to
@@ -43,7 +45,7 @@ public enum JobCommitPolicy {
    *             and should cover most use cases when {@link 
#COMMIT_ON_FULL_SUCCESS} is not appropriate.
    */
   @Deprecated
-  COMMIT_ON_PARTIAL_SUCCESS("partial"),
+  COMMIT_ON_PARTIAL_SUCCESS("partial", true),
 
   /**
    * Commit output data of tasks that successfully complete.
@@ -51,12 +53,14 @@ public enum JobCommitPolicy {
    * It is recommended to use this commit policy in conjunction with 
task-level data publishing (i.e., when
    * {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to {@code 
false}).
    */
-  COMMIT_SUCCESSFUL_TASKS("successful");
+  COMMIT_SUCCESSFUL_TASKS("successful", true);
 
   private final String name;
+  @Getter private final boolean allowPartialCommit;// Indicates if the commit 
policy allows partial task commits
 
-  JobCommitPolicy(String name) {
+  JobCommitPolicy(String name, boolean allowPartialCommit) {
     this.name = name;
+    this.allowPartialCommit = allowPartialCommit;
   }
 
   /**
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 97699f2ce2..ae85a6a083 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -63,11 +63,13 @@ import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.DatasetStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
+
 @Slf4j
 public class CommitActivityImpl implements CommitActivity {
 
@@ -97,12 +99,22 @@ public class CommitActivityImpl implements CommitActivity {
       Map<String, JobState.DatasetState> datasetStatesByUrns = 
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), 
Lists.newArrayList());
       TaskState firstTaskState = taskStates.get(0);
       log.info("TaskState (commit) [{}] (**first of {}**): {}", 
firstTaskState.getTaskId(), taskStates.size(), 
firstTaskState.toJsonString(true));
-      commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      Optional<FailedDatasetUrnsException> optFailure = Optional.empty();
+      try {
+        commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      } catch (FailedDatasetUrnsException exception) {
+        log.warn("Some datasets failed to be committed, proceeding with 
publishing commit step", exception);
+        optFailure = Optional.of(exception);
+      }
 
       boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
 
       Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), 
shouldIncludeFailedTasks);
-      return new CommitStats(datasetTaskSummaries, 
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
+      return new CommitStats(
+          datasetTaskSummaries,
+          
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
+          optFailure
+      );
     } catch (Exception e) {
       //TODO: IMPROVE GRANULARITY OF RETRIES
       throw ApplicationFailure.newNonRetryableFailureWithCause(
@@ -164,8 +176,8 @@ public class CommitActivityImpl implements CommitActivity {
 
       if (!failedDatasetUrns.isEmpty()) {
         String allFailedDatasets = String.join(", ", failedDatasetUrns);
-        log.error("Failed to commit dataset state for dataset(s) {}" + 
allFailedDatasets);
-        throw new IOException("Failed to commit dataset state for " + 
allFailedDatasets);
+        log.error("Failed to commit dataset state for dataset(s) {}", 
allFailedDatasets);
+        throw new FailedDatasetUrnsException(failedDatasetUrns);
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
@@ -205,7 +217,7 @@ public class CommitActivityImpl implements CommitActivity {
     // Only process successful datasets unless configuration to process failed 
datasets is set
     for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
       if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
-          && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+          && commitPolicy.isAllowPartialCommit())) {
         long totalBytesWritten = 0;
         long totalRecordsWritten = 0;
         int totalCommittedTasks = 0;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
index f929831133..6d1416f514 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -19,12 +19,15 @@ package org.apache.gobblin.temporal.ddm.work;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
+
 
 /**
  * Data structure representing the stats for a committed dataset, and the 
total number of committed workunits in the Gobblin Temporal job
@@ -37,8 +40,9 @@ import lombok.RequiredArgsConstructor;
 public class CommitStats {
   @NonNull private Map<String, DatasetStats> datasetStats;
   @NonNull private int numCommittedWorkUnits;
+  @NonNull private Optional<FailedDatasetUrnsException> optFailure;
 
   public static CommitStats createEmpty() {
-    return new CommitStats(new HashMap<>(), 0);
+    return new CommitStats(new HashMap<>(), 0, Optional.empty());
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index d568ea4e5b..c2c21d2825 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
 
 import io.temporal.activity.ActivityOptions;
 import io.temporal.common.RetryOptions;
+import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.Workflow;
 
 import java.time.Duration;
@@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
-    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
-    timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
-        .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
-        .submit();
+
+    if (!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+          .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+              
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+          .submit();// emit job summary info on both full and partial commit 
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
+    }
+    if (commitGobblinStats.getOptFailure().isPresent()) {
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed to commit dataset state for some dataset(s)"), 
commitGobblinStats.getOptFailure().get().getClass().toString(),
+          commitGobblinStats.getOptFailure().get());
+    }
     return commitGobblinStats;
   }
-
   private List<DatasetTaskSummary> 
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
     List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
     for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 6402e473bf..97c1c0a767 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
@@ -72,20 +73,48 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
+
+      try {
+        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);// Attempt partial commit before surfacing the failure
+      } catch (Exception commitException) {
+        // Combine current and commit exception messages for a more complete 
context
+        String combinedMessage = String.format(
+            "Processing failure: %s. Commit workflow failure: %s",
+            e.getMessage(),
+            commitException.getMessage()
+        );
+        log.error(combinedMessage);
+        throw ApplicationFailure.newNonRetryableFailureWithCause(
+            String.format("Processing failure: %s. Partial commit failure: 
%s", combinedMessage, commitException),
+            Exception.class.toString(),
+            new Exception(e)); // Wrap the original exception for stack trace 
preservation
       }
-      return result;
-    } else {
+      throw e;// Re-throw after any partial commit, to fail the parent 
workflow in case commitWorkflow didn't flow (unlikely)
+    }
+    return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);
+  }
+
+  private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec 
workSpec,
+      Map<String, Object> searchAttributes, Optional<Integer> 
workunitsProcessed) {
+    //  we are only inhibiting commit when workunitsProcessed is actually 
known to be zero
+    if (workunitsProcessed.filter(n -> n == 0).isPresent()) {
       log.error("No work units processed, so no commit attempted.");
       return CommitStats.createEmpty();
     }
+    CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
+    CommitStats result = commitWorkflow.commit(workSpec);
+    if (result.getNumCommittedWorkUnits() == 0) {
+      log.warn("No work units committed at the job level. They could have been 
committed at the task level.");
+    }
+    return result;
   }
 
   private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec 
workSpec) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java
new file mode 100644
index 0000000000..ea8e1def41
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.exception;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Getter;
+
+
+/**
+ * An exception thrown when a set of dataset URNs fail to be processed.
+ */
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FailedDatasetUrnsException extends IOException {
+
+  @Getter
+  private final Set<String> failedDatasetUrns;
+
+  /**
+   * Creates a new instance of this exception with the failed dataset URNs.
+   *
+   * @param failedDatasetUrns a set containing the URNs of the datasets that 
failed to process
+   */
+  public FailedDatasetUrnsException(Set<String> failedDatasetUrns) {
+    super("Failed to process the following dataset URNs: " + String.join(",", 
failedDatasetUrns));
+    this.failedDatasetUrns = failedDatasetUrns;
+  }
+
+  /**
+   * Default constructor for {@code FailedDatasetUrnsException}.
+   * <p>
+   * This constructor initializes an empty {@link HashSet} for {@code 
failedDatasetUrns}.
+   * It is provided to support frameworks like Jackson that require a 
no-argument constructor
+   * for deserialization purposes.
+   * </p>
+   * */
+  public FailedDatasetUrnsException() {
+    super();
+    this.failedDatasetUrns = new HashSet<>();
+  }
+}

Reply via email to