phet commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1876806190


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java:
##########
@@ -34,11 +36,13 @@
 @Data
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
+@Accessors(chain = true)
 public class CommitStats {
   @NonNull private Map<String, DatasetStats> datasetStats;
   @NonNull private int numCommittedWorkUnits;
+  @NonNull private Optional<Exception> optFailure;

Review Comment:
   let's treat these `@Data` POJOs as immutable and construct them, fully 
initialized.
   
   all these fields actually would have been `private final`, but that doesn't 
play well w/ jackson's JSON deserialization, so they're forced to be `@NonNull 
private` instead
   
   (hence no `@Accessors`)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -67,6 +67,8 @@
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;

Review Comment:
   please alphabetize



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -59,10 +61,19 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
-    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
-    timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
-        .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
-        .submit();
+
+    if(!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+          .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+              
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+          .submit();
+    }
+    if(commitGobblinStats.getOptFailure().isPresent()){

Review Comment:
   `if` needs a space before opening parens.  also needs a space before the `{`
   
   worth a comment like:
   ```
   // emit job summary info on both full and partial commit (ultimately for 
`GaaSJobObservabilityEvent.datasetsMetrics`) 
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) {
       Map<String, JobState.DatasetState> datasetStatesByUrns = 
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), 
Lists.newArrayList());
       TaskState firstTaskState = taskStates.get(0);
       log.info("TaskState (commit) [{}] (**first of {}**): {}", 
firstTaskState.getTaskId(), taskStates.size(), 
firstTaskState.toJsonString(true));
-      commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      CommitStats commitStats = CommitStats.createEmpty();
+      try {
+        commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      } catch (FailedDatasetUrnsException exception) {
+        log.info("Some datasets failed to be committed, proceeding with 
publishing commit step");
+        commitStats.setOptFailure(Optional.of(exception));
+      }
 
       boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
 
       Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), 
shouldIncludeFailedTasks);
-      return new CommitStats(datasetTaskSummaries, 
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
+      return commitStats.setDatasetStats(datasetTaskSummaries)
+          .setNumCommittedWorkUnits(
+              
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());

Review Comment:
   here, please treat `CommitStats` as immutable:
   ```
   Optional<Exception> optFailure = Optional.empty();
   try {
     commitTaskStates(jobState, datasetStatesByUrns, jobContext);
   } catch (FailedDatasetUrnsException e) {
     log.warn("Some datasets failed to be committed, proceeding with publishing 
commit step", e);
     optFailure = Optional.of(exception);
   }
   
   boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
   
   Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), 
shouldIncludeFailedTasks);
   return new CommitStats(
       datasetTaskSummaries,
       
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
       optFailure
   );
   ```
   NOTE: also upgrading the logging to `.warn` and including the specific 
exception



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.exception;
+
+import java.io.IOException;
+
+/**
+ * An exception thrown when a set of dataset URNs fail to be processed.
+ */
+public class FailedDatasetUrnsException extends IOException {
+
+
+  /**
+   * Creates a new instance of this exception with the failed dataset URNs.
+   *
+   * @param failedDatasetUrns the String of failed dataset URNs joined by comma
+   */
+  public FailedDatasetUrnsException(String failedDatasetUrns) {
+    super("Failed to process the following dataset URNs: " + 
failedDatasetUrns);
+  }

Review Comment:
   rather than passing in a string with multiple URNs already joined, have the 
ctor take a `List<String>` that keeps them separate.  also add a member that 
whoever catches the exception may access the list.  you can still format the 
`super` call w/ the joined URNs as you currently are.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +72,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
-    int workunitsProcessed =
-        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
-            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
-      }
-      return result;
-    } else {
+
+    Optional<Integer> workunitsProcessed = Optional.empty();
+    try {
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
+          Optional.empty()));
+    } catch (Exception e) {
+      log.error("ProcessWorkUnits failure - will attempt partial commit before 
announcing error", e);
+      performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);
+      throw e; //We want to proceed with partial commit and throw exception so 
that the parent workflow ExecuteGobblinWorkflowImpl can throw the failure event

Review Comment:
   need space after `//` and also could streamline comment:
   ```
   throw e; // re-throw after any partial commit, to fail the parent workflow
   ```
   
   but anyway, that `throw` on L84 won't run will it?  the partial commit 
(`CommitStepWorkflow`) will throw an exception that unwinds the stack 
beforehand.  am I correct that at best this `throw` is a "should never happen" 
fallback?
   
   ITO which we'd prefer to surface (possibly both) - what specific info is in 
the orig exception relative to the one currently being throw?  would it make 
sense to combine messages from the two or simply throw one or the other?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -59,10 +61,19 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
-    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
-    timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
-        .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
-        .submit();
+
+    if(!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+          .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+              
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+          .submit();
+    }
+    if(commitGobblinStats.getOptFailure().isPresent()){
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed to commit dataset state for some dataset(s)"), 
FailedDatasetUrnsException.class.toString(),
+          commitGobblinStats.getOptFailure().get());
+    }

Review Comment:
   this may not be type-safe at runtime, if the type of `getOptFailure` is not 
`FailedDatasetUrnsException`.  if that's the only possibility, then change the 
field type in `CommitStats`.  otherwise:
   ```
     Exception failure = commitGobblinStats.getOptFailure().get();
     throw ApplicationFailure.newNonRetryableFailureWithCause(
         ...,
         failure.getClass().getName(),
         failure);
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -205,7 +215,7 @@ private Map<String, DatasetStats> 
summarizeDatasetOutcomes(Map<String, JobState.
     // Only process successful datasets unless configuration to process failed 
datasets is set
     for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
       if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
-          && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+          && (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || 
commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) {

Review Comment:
   good catch, the impl previously missed the equivalent "other" naming.  given 
the potential for similar omission in the future, I suggest to add a field on 
the `enum`:
   ```
   @Getter private final boolean allowPartialCommit;
   ```
   then it's encapsulated via `commitPolicy.isAllowPartialCommit()` (if you 
like use "fluent" form)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to