phet commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1883352538


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
-    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
-    timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
-        .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
-        .submit();
+
+    if (!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+          .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+              
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+          .submit();// emit job summary info on both full and partial commit 
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
+    }
+    if (commitGobblinStats.getOptFailure().isPresent()) {
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed to commit dataset state for some dataset(s)"), 
commitGobblinStats.getOptFailure().get().getClass().toString(),

Review Comment:
   NBD, but no need for `String.format`, when a string literal would do



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
+
+      try {
+        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);// Attempt partial commit before surfacing the failure

Review Comment:
   needs space before `//`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
+
+      try {
+        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);// Attempt partial commit before surfacing the failure
+      } catch (Exception commitException) {
+        // Combine current and commit exception messages for a more complete 
context
+        String combinedMessage = String.format(
+            "Processing failure: %s. Commit workflow failure: %s",
+            e.getMessage(),
+            commitException.getMessage()
+        );
+        log.error(combinedMessage);
+        throw ApplicationFailure.newNonRetryableFailureWithCause(
+            String.format("Processing failure: %s. Partial commit failure: 
%s", combinedMessage, commitException),

Review Comment:
   1. maybe add intro context plus a newline to separate the msgs.  e.g.
   ```
   "ProcessWorkUnits failure (as expected) led to failure during partial commit 
attempt -\n  ProcessWorkUnits failure: %s\n  CommitStep failure: %s"
   ```
   
   2.  also, can't you reuse `combinedMessage` on L96?  or is more of 
`commitException` than just the msg getting used the second time?
   
   3. `e` will not lose its stack trace, so no need to wrap it as `new 
Exception(e)`, unless you want someone to know you rethrew it from this 
particular place.  that said, I'd avoid wrapping, since that just adds more 
layers to peel back while debugging



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);

Review Comment:
   I may even have suggested this text, but reading again, it's ambiguous (e.g. 
was the failure *while* attempting partial commit?)
   
   this would be clearer:
   ```
   "ProcessWorkUnits failure - will attempt partial commit..."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to