[ 
https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=948163&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-948163
 ]

ASF GitHub Bot logged work on GOBBLIN-2175:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Dec/24 06:39
            Start Date: 13/Dec/24 06:39
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1883352538


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
-    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
-    timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
-        .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
-        .submit();
+
+    if (!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+          .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+              
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+          .submit();// emit job summary info on both full and partial commit 
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
+    }
+    if (commitGobblinStats.getOptFailure().isPresent()) {
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed to commit dataset state for some dataset(s)"), 
commitGobblinStats.getOptFailure().get().getClass().toString(),

Review Comment:
   NBD, but no need for `String.format`, when a string literal would do



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
+
+      try {
+        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);// Attempt partial commit before surfacing the failure

Review Comment:
   needs space before `//`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
+
+      try {
+        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);// Attempt partial commit before surfacing the failure
+      } catch (Exception commitException) {
+        // Combine current and commit exception messages for a more complete 
context
+        String combinedMessage = String.format(
+            "Processing failure: %s. Commit workflow failure: %s",
+            e.getMessage(),
+            commitException.getMessage()
+        );
+        log.error(combinedMessage);
+        throw ApplicationFailure.newNonRetryableFailureWithCause(
+            String.format("Processing failure: %s. Partial commit failure: 
%s", combinedMessage, commitException),

Review Comment:
   1. maybe add intro context plus a newline to separate the msgs.  e.g.
   ```
   "ProcessWorkUnits failure (as expected) led to failure during partial commit 
attempt -\n  ProcessWorkUnits failure: %s\n  CommitStep failure: %s"
   ```
   
   2.  also, can't you reuse `combinedMessage` on L96?  or is more of 
`commitException` than just the msg getting used the second time?
   
   3. `e` will not lose its stack trace, so no need to wrap it as `new 
Exception(e)`, unless you want someone to know you rethrew it from this 
particular place.  that said, I'd avoid wrapping, since that just adds more 
layers to peel back while debugging



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);

Review Comment:
   I may even have suggested this text, but reading again, it's ambiguous (e.g. 
was the failure *while* attempting partial commit?)
   
   this would be clearer:
   ```
   "ProcessWorkUnits failure - will attempt partial commit..."
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 948163)
    Time Spent: 2h 50m  (was: 2h 40m)

> Fix partial commit in temporal flow
> -----------------------------------
>
>                 Key: GOBBLIN-2175
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2175
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Aditya Pratap Singh
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Fix partial commit in temporal flow



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to