[ 
https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=946827&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946827
 ]

ASF GitHub Bot logged work on GOBBLIN-2175:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/24 09:09
            Start Date: 05/Dec/24 09:09
    Worklog Time Spent: 10m 
      Work Description: pratapaditya04 commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1870954169


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+     workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()));
+    } catch (Exception e) {
+      log.error("Exception occurred in performing workload,proceeding with 
commit step", e);
+      // We want to mark the GaaS flow as failure, in case performWorkFlow 
fails, but we still want to go ahead with commiting the workunits which were 
processed before failure
+      sendFailureEventToGaaS(workSpec);
+      return proceedWithCommitStepAndReturnCommitStats(workSpec, 
searchAttributes, workunitsProcessed);
+    }
+    return proceedWithCommitStepAndReturnCommitStats(workSpec, 
searchAttributes, workunitsProcessed);
+  }
+
+  private void sendFailureEventToGaaS(WUProcessingSpec workSpec) {
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+    timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
+  }
+
+  private CommitStats 
proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec,

Review Comment:
   makes sense , addressed



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+     workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()));
+    } catch (Exception e) {
+      log.error("Exception occurred in performing workload,proceeding with 
commit step", e);
+      // We want to mark the GaaS flow as failure, in case performWorkFlow 
fails, but we still want to go ahead with commiting the workunits which were 
processed before failure
+      sendFailureEventToGaaS(workSpec);
+      return proceedWithCommitStepAndReturnCommitStats(workSpec, 
searchAttributes, workunitsProcessed);
+    }
+    return proceedWithCommitStepAndReturnCommitStats(workSpec, 
searchAttributes, workunitsProcessed);
+  }
+
+  private void sendFailureEventToGaaS(WUProcessingSpec workSpec) {
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+    timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
+  }
+
+  private CommitStats 
proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec,
+      Map<String, Object> searchAttributes, Optional<Integer> 
workunitsProcessed) {
+    /*
+    !workunitsProcessed.isPresent() condition helps in case of partial commit,
+     workunitsProcessed will be Optional.Empty() only in cases performWorkload 
throws an exception
+     we are only inhibiting commit when workunitsProcessed actually known to 
be zero
+    * */
+    if (!workunitsProcessed.isPresent() || workunitsProcessed.get() > 0) {

Review Comment:
   addressed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 946827)
    Time Spent: 1h  (was: 50m)

> Fix partial commit in temporal flow
> -----------------------------------
>
>                 Key: GOBBLIN-2175
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2175
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Aditya Pratap Singh
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Fix partial commit in temporal flow



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to