phet commented on code in PR #4078: URL: https://github.com/apache/gobblin/pull/4078#discussion_r1883352538
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java: ########## @@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow { @Override public CommitStats commit(WUProcessingSpec workSpec) { CommitStats commitGobblinStats = activityStub.commit(workSpec); - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); - timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) - .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) - .submit(); + + if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) + .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson( + convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) + .submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`) + } + if (commitGobblinStats.getOptFailure().isPresent()) { + throw ApplicationFailure.newNonRetryableFailureWithCause( + String.format("Failed to commit dataset state for some dataset(s)"), commitGobblinStats.getOptFailure().get().getClass().toString(), Review Comment: NBD, but no need for `String.format`, when a string literal would do ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); + + try { + performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure Review Comment: needs space before `//` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); + + try { + performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure + } catch (Exception commitException) { + // Combine current and commit exception messages for a more complete context + String combinedMessage = String.format( + "Processing failure: %s. Commit workflow failure: %s", + e.getMessage(), + commitException.getMessage() + ); + log.error(combinedMessage); + throw ApplicationFailure.newNonRetryableFailureWithCause( + String.format("Processing failure: %s. Partial commit failure: %s", combinedMessage, commitException), Review Comment: 1. maybe add intro context plus a newline to separate the msgs. e.g. ``` "ProcessWorkUnits failure (as expected) led to failure during partial commit attempt -\n ProcessWorkUnits failure: %s\n CommitStep failure: %s" ``` 2. also, can't you reuse `combinedMessage` on L96? or is more of `commitException` than just the msg getting used the second time? 3. `e` will not lose its stack trace, so no need to wrap it as `new Exception(e)`, unless you want someone to know you rethrew it from this particular place. that said, I'd avoid wrapping, since that just adds more layers to peel back while debugging ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); Review Comment: I may even have suggested this text, but reading again, it's ambiguous (e.g. was the failure *while* attempting partial commit?) this would be clearer: ``` "ProcessWorkUnits failure - will attempt partial commit..." ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org