phet commented on code in PR #4078: URL: https://github.com/apache/gobblin/pull/4078#discussion_r1876806190
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java: ########## @@ -34,11 +36,13 @@ @Data @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor +@Accessors(chain = true) public class CommitStats { @NonNull private Map<String, DatasetStats> datasetStats; @NonNull private int numCommittedWorkUnits; + @NonNull private Optional<Exception> optFailure; Review Comment: let's treat these `@Data` POJOs as immutable and construct them, fully initialized. all these fields actually would have been `private final`, but that doesn't play well w/ jackson's JSON deserialization, so they're forced to be `@NonNull private` instead (hence no `@Accessors`) ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java: ########## @@ -67,6 +67,8 @@ import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.executors.IteratorExecutor; +import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException; Review Comment: please alphabetize ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java: ########## @@ -59,10 +61,19 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow { @Override public CommitStats commit(WUProcessingSpec workSpec) { CommitStats commitGobblinStats = activityStub.commit(workSpec); - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); - timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) - .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) - .submit(); + + if(!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) + .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson( + convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) + .submit(); + } + if(commitGobblinStats.getOptFailure().isPresent()){ Review Comment: `if` needs a space before opening parens. also needs a space before the `{` worth a comment like: ``` // emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`) ``` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java: ########## @@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) { Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList()); TaskState firstTaskState = taskStates.get(0); log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true)); - commitTaskStates(jobState, datasetStatesByUrns, jobContext); + CommitStats commitStats = CommitStats.createEmpty(); + try { + commitTaskStates(jobState, datasetStatesByUrns, jobContext); + } catch (FailedDatasetUrnsException exception) { + log.info("Some datasets failed to be committed, proceeding with publishing commit step"); + commitStats.setOptFailure(Optional.of(exception)); + } boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"); Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks); - return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum()); + return commitStats.setDatasetStats(datasetTaskSummaries) + .setNumCommittedWorkUnits( + datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum()); Review Comment: here, please treat `CommitStats` as immutable: ``` Optional<Exception> optFailure = Optional.empty(); try { commitTaskStates(jobState, datasetStatesByUrns, jobContext); } catch (FailedDatasetUrnsException e) { log.warn("Some datasets failed to be committed, proceeding with publishing commit step", e); optFailure = Optional.of(exception); } boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"); Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks); return new CommitStats( datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(), optFailure ); ``` NOTE: also upgrading the logging to `.warn` and including the specific exception ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.temporal.exception; + +import java.io.IOException; + +/** + * An exception thrown when a set of dataset URNs fail to be processed. + */ +public class FailedDatasetUrnsException extends IOException { + + + /** + * Creates a new instance of this exception with the failed dataset URNs. + * + * @param failedDatasetUrns the String of failed dataset URNs joined by comma + */ + public FailedDatasetUrnsException(String failedDatasetUrns) { + super("Failed to process the following dataset URNs: " + failedDatasetUrns); + } Review Comment: rather than passing in a string with multiple URNs already joined, have the ctor take a `List<String>` that keeps them separate. also add a member that whoever catches the exception may access the list. you can still format the `super` call w/ the joined URNs as you currently are. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -72,20 +72,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); - } - return result; - } else { + + Optional<Integer> workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - will attempt partial commit before announcing error", e); + performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed); + throw e; //We want to proceed with partial commit and throw exception so that the parent workflow ExecuteGobblinWorkflowImpl can throw the failure event Review Comment: need space after `//` and also could streamline comment: ``` throw e; // re-throw after any partial commit, to fail the parent workflow ``` but anyway, that `throw` on L84 won't run will it? the partial commit (`CommitStepWorkflow`) will throw an exception that unwinds the stack beforehand. am I correct that at best this `throw` is a "should never happen" fallback? ITO which we'd prefer to surface (possibly both) - what specific info is in the orig exception relative to the one currently being throw? would it make sense to combine messages from the two or simply throw one or the other? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java: ########## @@ -59,10 +61,19 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow { @Override public CommitStats commit(WUProcessingSpec workSpec) { CommitStats commitGobblinStats = activityStub.commit(workSpec); - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); - timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) - .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) - .submit(); + + if(!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) + .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson( + convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) + .submit(); + } + if(commitGobblinStats.getOptFailure().isPresent()){ + throw ApplicationFailure.newNonRetryableFailureWithCause( + String.format("Failed to commit dataset state for some dataset(s)"), FailedDatasetUrnsException.class.toString(), + commitGobblinStats.getOptFailure().get()); + } Review Comment: this may not be type-safe at runtime, if the type of `getOptFailure` is not `FailedDatasetUrnsException`. if that's the only possibility, then change the field type in `CommitStats`. otherwise: ``` Exception failure = commitGobblinStats.getOptFailure().get(); throw ApplicationFailure.newNonRetryableFailureWithCause( ..., failure.getClass().getName(), failure); ``` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java: ########## @@ -205,7 +215,7 @@ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState. // Only process successful datasets unless configuration to process failed datasets is set for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) { if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED - && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) { + && (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) { Review Comment: good catch, the impl previously missed the equivalent "other" naming. given the potential for similar omission in the future, I suggest to add a field on the `enum`: ``` @Getter private final boolean allowPartialCommit; ``` then it's encapsulated via `commitPolicy.isAllowPartialCommit()` (if you like use "fluent" form) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org