[ https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=946827&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946827 ]
ASF GitHub Bot logged work on GOBBLIN-2175: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Dec/24 09:09 Start Date: 05/Dec/24 09:09 Worklog Time Spent: 10m Work Description: pratapaditya04 commented on code in PR #4078: URL: https://github.com/apache/gobblin/pull/4078#discussion_r1870954169 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty())); + } catch (Exception e) { + log.error("Exception occurred in performing workload,proceeding with commit step", e); + // We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure + sendFailureEventToGaaS(workSpec); + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + + private void sendFailureEventToGaaS(WUProcessingSpec workSpec) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); + } + + private CommitStats proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec, Review Comment: makes sense , addressed ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty())); + } catch (Exception e) { + log.error("Exception occurred in performing workload,proceeding with commit step", e); + // We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure + sendFailureEventToGaaS(workSpec); + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + + private void sendFailureEventToGaaS(WUProcessingSpec workSpec) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); + } + + private CommitStats proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec, + Map<String, Object> searchAttributes, Optional<Integer> workunitsProcessed) { + /* + !workunitsProcessed.isPresent() condition helps in case of partial commit, + workunitsProcessed will be Optional.Empty() only in cases performWorkload throws an exception + we are only inhibiting commit when workunitsProcessed actually known to be zero + * */ + if (!workunitsProcessed.isPresent() || workunitsProcessed.get() > 0) { Review Comment: addressed Issue Time Tracking ------------------- Worklog Id: (was: 946827) Time Spent: 1h (was: 50m) > Fix partial commit in temporal flow > ----------------------------------- > > Key: GOBBLIN-2175 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2175 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Aditya Pratap Singh > Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Fix partial commit in temporal flow -- This message was sent by Atlassian Jira (v8.20.10#820010)