Will-Lo commented on code in PR #4052:
URL: https://github.com/apache/gobblin/pull/4052#discussion_r1757366881


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -57,21 +59,25 @@ public CommitStats process(WUProcessingSpec workSpec) {
   }
 
   private CommitStats performWork(WUProcessingSpec workSpec) {
-    Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
-    NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
-    int workunitsProcessed = processingWorkflow.performWorkload(
-        WorkflowAddr.ROOT, workload, 0,
-        workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
-    );
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+    try {
+      Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, 
Help.loadFileSystem(workSpec));
+      NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, jobState);
+      int workunitsProcessed = 
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
+      if (workunitsProcessed > 0) {
+        CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(jobState);
+        CommitStats result = commitWorkflow.commit(workSpec);
+        if (result.getNumCommittedWorkUnits() == 0) {
+          log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+        }
+        return result;
+      } else {
+        log.error("No work units processed, so no commit attempted.");
+        return CommitStats.createEmpty();
       }
-      return result;
-    } else {
-      log.error("No work units processed, so no commit attempted.");
+    } catch (Exception ignored) {
+      log.error("Exception occured during performing Work", ignored);
       return CommitStats.createEmpty();

Review Comment:
   If it throws an exception when loading the jobState (which I presume would 
lead to an empty jobState) we should just throw an ApplicationException in this 
scenario, so that we can push the Temporal framework to retry the workflow 
rather than progressing silently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to