[ https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=948163&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-948163 ]
ASF GitHub Bot logged work on GOBBLIN-2175: ------------------------------------------- Author: ASF GitHub Bot Created on: 13/Dec/24 06:39 Start Date: 13/Dec/24 06:39 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4078: URL: https://github.com/apache/gobblin/pull/4078#discussion_r1883352538 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java: ########## @@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow { @Override public CommitStats commit(WUProcessingSpec workSpec) { CommitStats commitGobblinStats = activityStub.commit(workSpec); - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); - timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) - .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) - .submit(); + + if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) + .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson( + convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) + .submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`) + } + if (commitGobblinStats.getOptFailure().isPresent()) { + throw ApplicationFailure.newNonRetryableFailureWithCause( + String.format("Failed to commit dataset state for some dataset(s)"), commitGobblinStats.getOptFailure().get().getClass().toString(), Review Comment: NBD, but no need for `String.format`, when a string literal would do ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); + + try { + performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure Review Comment: needs space before `//` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); + + try { + performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure + } catch (Exception commitException) { + // Combine current and commit exception messages for a more complete context + String combinedMessage = String.format( + "Processing failure: %s. Commit workflow failure: %s", + e.getMessage(), + commitException.getMessage() + ); + log.error(combinedMessage); + throw ApplicationFailure.newNonRetryableFailureWithCause( + String.format("Processing failure: %s. Partial commit failure: %s", combinedMessage, commitException), Review Comment: 1. maybe add intro context plus a newline to separate the msgs. e.g. ``` "ProcessWorkUnits failure (as expected) led to failure during partial commit attempt -\n ProcessWorkUnits failure: %s\n CommitStep failure: %s" ``` 2. also, can't you reuse `combinedMessage` on L96? or is more of `commitException` than just the msg getting used the second time? 3. `e` will not lose its stack trace, so no need to wrap it as `new Exception(e)`, unless you want someone to know you rethrew it from this particular place. that said, I'd avoid wrapping, since that just adds more layers to peel back while debugging ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); Review Comment: I may even have suggested this text, but reading again, it's ambiguous (e.g. was the failure *while* attempting partial commit?) this would be clearer: ``` "ProcessWorkUnits failure - will attempt partial commit..." ``` Issue Time Tracking ------------------- Worklog Id: (was: 948163) Time Spent: 2h 50m (was: 2h 40m) > Fix partial commit in temporal flow > ----------------------------------- > > Key: GOBBLIN-2175 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2175 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Aditya Pratap Singh > Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Fix partial commit in temporal flow -- This message was sent by Atlassian Jira (v8.20.10#820010)