This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fbd4ee8e8 [GOBBLIN-2020] Fix number of failure scenarios and bugs in 
temporal workflows (#3900)
fbd4ee8e8 is described below

commit fbd4ee8e8b7a57d5f44f6cf0e887d0eb029b94aa
Author: William Lo <[email protected]>
AuthorDate: Fri Mar 22 14:01:33 2024 -0400

    [GOBBLIN-2020] Fix number of failure scenarios and bugs in temporal 
workflows (#3900)
    
    Fix number of failure scenarios and bugs in temporal workflows
---
 .../ddm/activity/impl/CommitActivityImpl.java      | 22 +++++++-
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |  2 +-
 .../gobblin/temporal/ddm/work/assistance/Help.java |  6 +--
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  | 63 +++++++++++++---------
 4 files changed, 61 insertions(+), 32 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 8874bccd7..07c81227d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -25,10 +25,12 @@ import com.google.common.collect.Maps;
 import io.temporal.failure.ApplicationFailure;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import javax.annotation.Nullable;
@@ -60,13 +62,17 @@ public class CommitActivityImpl implements CommitActivity {
 
   static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
   static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
   @Override
   public int commit(WUProcessingSpec workSpec) {
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    String jobName = UNDEFINED_JOB_NAME;
     try {
       FileSystem fs = Help.loadFileSystem(workSpec);
       JobState jobState = Help.loadJobState(workSpec, fs);
+      jobName = jobState.getJobName();
       SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
       JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, null);
       // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
@@ -86,7 +92,7 @@ public class CommitActivityImpl implements CommitActivity {
     } catch (Exception e) {
       //TODO: IMPROVE GRANULARITY OF RETRIES
       throw ApplicationFailure.newNonRetryableFailureWithCause(
-          "Failed to commit dataset state for some dataset(s) of job 
<jobStub>",
+          String.format("Failed to commit dataset state for some dataset(s) of 
job %s", jobName),
           IOException.class.toString(),
           new IOException(e),
           null
@@ -135,9 +141,21 @@ public class CommitActivityImpl implements CommitActivity {
 
       IteratorExecutor.logFailures(result, null, 10);
 
+      Set<String> failedDatasetUrns = new HashSet<>();
+      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+        // Set the overall job state to FAILED if the job failed to process 
any dataset
+        if (datasetState.getState() == JobState.RunningState.FAILED) {
+          failedDatasetUrns.add(datasetState.getDatasetUrn());
+        }
+      }
+      if (!failedDatasetUrns.isEmpty()) {
+        String allFailedDatasets = String.join(", ", failedDatasetUrns);
+        log.error("Failed to commit dataset state for dataset(s) {}" + 
allFailedDatasets);
+        throw new IOException("Failed to commit dataset state for " + 
allFailedDatasets);
+      }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
"<job_name_stub>");
+        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 2cb95a5d8..c3de4146b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -75,7 +75,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
       JobStateUtils.writeJobState(jobState, workDirRoot, fs);
 
-      return workUnits.size();
+      return jobState.getTaskCount();
     } catch (ReflectiveOperationException roe) {
       String errMsg = "Unable to construct a source for generating workunits 
for job " + jobState.getJobId();
       log.error(errMsg, roe);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 09da984a6..23121bb0f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -77,9 +77,9 @@ public class Help {
   }
 
   /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
-  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
workerConfig) {
-    Optional<String> optFlowExecId = 
Optional.ofNullable(ConfigUtils.getString(workerConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
-    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, 
workerConfig);
+  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
jobProps) {
+    Optional<String> optFlowExecId = 
Optional.ofNullable(ConfigUtils.getString(jobProps, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, 
jobProps);
   }
 
   public static String calcPerExecQualifierWithOptFlowExecId(Optional<String> 
optFlowExecId, Config workerConfig) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 39f401a05..c66ff47c1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -26,6 +26,7 @@ import com.typesafe.config.ConfigFactory;
 import io.temporal.activity.ActivityOptions;
 import io.temporal.api.enums.v1.ParentClosePolicy;
 import io.temporal.common.RetryOptions;
+import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
 
@@ -33,8 +34,9 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.hadoop.fs.Path;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }
+
+        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+        if (numWUsProcessed != numWUsGenerated) {
+          log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
+          // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+        }
       }
+      timer.stop();
+    } catch (Exception e) {
+      // Emit a failed GobblinTrackingEvent to record job failures
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
+          e.getClass().toString(),
+          e,
+          null
+      );
     }
-    timer.stop();
     return numWUsGenerated;
-
   }
 
-  protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow() {
+  protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties 
jobProps) {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps)))
         .build();
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
   }

Reply via email to