[ https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=946801&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946801 ]
ASF GitHub Bot logged work on GOBBLIN-2175: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Dec/24 07:33 Start Date: 05/Dec/24 07:33 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4078: URL: https://github.com/apache/gobblin/pull/4078#discussion_r1870771472 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty())); + } catch (Exception e) { + log.error("Exception occurred in performing workload,proceeding with commit step", e); + // We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure + sendFailureEventToGaaS(workSpec); + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + + private void sendFailureEventToGaaS(WUProcessingSpec workSpec) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); + } + + private CommitStats proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec, + Map<String, Object> searchAttributes, Optional<Integer> workunitsProcessed) { + /* + !workunitsProcessed.isPresent() condition helps in case of partial commit, + workunitsProcessed will be Optional.Empty() only in cases performWorkload throws an exception + we are only inhibiting commit when workunitsProcessed actually known to be zero + * */ + if (!workunitsProcessed.isPresent() || workunitsProcessed.get() > 0) { Review Comment: might be clearer: ``` if (workunitsProcessed.filter(n -> n == 0).isPresent()) { log.error("No work units processed, so no commit attempted."); return CommitStats.createEmpty(); } else { ... } ``` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty())); + } catch (Exception e) { + log.error("Exception occurred in performing workload,proceeding with commit step", e); + // We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure + sendFailureEventToGaaS(workSpec); + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); Review Comment: I was about to suggest you omit this line and "fall through", but the timing is actually crucial. what you're doing here results in sending a failure GTE before doing the commit. that would lead GaaS in turn to emit the job observability event *and* complete the flow execution before commit even runs. if that case, the datasetsMetrics definitely won't be filled in the job observability event. much better would be to wait to send the JobFailed GTE only *after* the JobSummary GTE is [sent](https://github.com/apache/gobblin/blob/4df6a52d520b4c8718a5df5c71dcef1248b1161e/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java#L63) hence: ``` performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed); throw e; // [see comment for L90 about why we throw rather than sending a GTE directly] ``` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty())); + } catch (Exception e) { + log.error("Exception occurred in performing workload,proceeding with commit step", e); + // We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure + sendFailureEventToGaaS(workSpec); + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + + private void sendFailureEventToGaaS(WUProcessingSpec workSpec) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); + } + + private CommitStats proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec, Review Comment: `performCommitIfAnyWorkUnitsProcessed`? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty())); + } catch (Exception e) { + log.error("Exception occurred in performing workload,proceeding with commit step", e); + // We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure + sendFailureEventToGaaS(workSpec); + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); + } + + private void sendFailureEventToGaaS(WUProcessingSpec workSpec) { Review Comment: I would advise against mentioning gaas in the name (maybe instead `sendJobFailedEvent`)... ... but the bigger issue is that we don't want to send a JobFailed GTE here and then successfully complete this sub-workflow. if we did, the parent (`ExecuteGobblinWorkflowImpl`) [will send a JobSuccess GTE](https://github.com/apache/gobblin/blob/4e38b2fd25eb4ca84227f654a0f949c8f598f75e/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java#L134) and the two would be in conflict. instead we should fail this sub-workflow by throwing an exception Issue Time Tracking ------------------- Worklog Id: (was: 946801) Time Spent: 50m (was: 40m) > Fix partial commit in temporal flow > ----------------------------------- > > Key: GOBBLIN-2175 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2175 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Aditya Pratap Singh > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Fix partial commit in temporal flow -- This message was sent by Atlassian Jira (v8.20.10#820010)