This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 77f9d3896 [GOBBLIN-2054] Fix `CommitActivityImpl` to succeed for 
`TaskState`s that did not originate with `CopySource` (#3934)
77f9d3896 is described below

commit 77f9d3896c7b6249fca52e481d4f62bce0a06831
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Apr 25 08:28:02 2024 -0700

    [GOBBLIN-2054] Fix `CommitActivityImpl` to succeed for `TaskState`s that 
did not originate with `CopySource` (#3934)
---
 .../org/apache/gobblin/runtime/JobContext.java     |   3 +-
 .../java/org/apache/gobblin/runtime/JobState.java  |   9 +-
 .../ddm/activity/impl/CommitActivityImpl.java      | 114 ++++++++++-----------
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     |   2 +-
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |   3 +-
 5 files changed, 63 insertions(+), 68 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index a7b7fd0f5..7a8d42c09 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -544,8 +544,7 @@ public class JobContext implements Closeable {
    * or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
    */
   public static boolean shouldCommitDataInJob(State state) {
-    boolean jobCommitPolicyIsFull =
-        JobCommitPolicy.getCommitPolicy(state.getProperties()) == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+    boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state) == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
     boolean publishDataAtJobLevel = 
state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
         ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
     boolean jobDataPublisherSpecified =
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index c58641e89..d61777f83 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -392,16 +392,21 @@ public class JobState extends SourceState implements 
JobProgress {
    * @return a {@link Map} from dataset URNs to {@link DatasetState}s 
representing the dataset states
    */
   public Map<String, DatasetState> createDatasetStatesByUrns() {
+    return calculateDatasetStatesByUrns(this.taskStates.values(), 
this.skippedTaskStates.values());
+  }
+
+  /** {@see JobState#createDatasetStatesByUrns} */
+  public Map<String, DatasetState> 
calculateDatasetStatesByUrns(Collection<TaskState> allTaskStates, 
Collection<TaskState> allSkippedTaskStates) {
     Map<String, DatasetState> datasetStatesByUrns = Maps.newHashMap();
 
-    for (TaskState taskState : this.taskStates.values()) {
+    for (TaskState taskState : allTaskStates) {
       String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
 
       datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
       datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
     }
 
-    for (TaskState taskState : this.skippedTaskStates.values()) {
+    for (TaskState taskState : allSkippedTaskStates) {
       String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
       datasetStatesByUrns.get(datasetUrn).addSkippedTaskState(taskState);
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index f945af1d0..94b5420ee 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -17,14 +17,7 @@
 
 package org.apache.gobblin.temporal.ddm.activity.impl;
 
-import com.google.common.base.Function;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import io.temporal.failure.ApplicationFailure;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -33,13 +26,24 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import io.temporal.failure.ApplicationFailure;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.JobContext;
@@ -57,8 +61,6 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 
 @Slf4j
@@ -72,37 +74,27 @@ public class CommitActivityImpl implements CommitActivity {
   public int commit(WUProcessingSpec workSpec) {
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
-    String jobName = UNDEFINED_JOB_NAME;
+    Optional<String> optJobName = Optional.empty();
     AutomaticTroubleshooter troubleshooter = null;
     try {
       FileSystem fs = Help.loadFileSystem(workSpec);
       JobState jobState = Help.loadJobState(workSpec, fs);
-      jobName = jobState.getJobName();
+      optJobName = Optional.ofNullable(jobState.getJobName());
       SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
       troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
       troubleshooter.start();
-      JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, 
troubleshooter.getIssueRepository());
-      // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
-      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
-      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
-      log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
-      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
-      Optional<Queue<TaskState>> taskStateQueueOpt =
-              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath.getName(), numDeserializationThreads);
-      if (!taskStateQueueOpt.isPresent()) {
-        log.error("No task states found at " + jobOutputPath);
-        return 0;
+      List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, 
numDeserializationThreads);
+      if (!taskStates.isEmpty()) {
+        JobContext jobContext = new JobContext(jobState.getProperties(), log, 
instanceBroker, troubleshooter.getIssueRepository());
+        commitTaskStates(jobState, taskStates, jobContext);
       }
-      Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
-      commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), 
globalGobblinContext);
-      return taskStateQueue.size();
+      return taskStates.size();
     } catch (Exception e) {
       //TODO: IMPROVE GRANULARITY OF RETRIES
       throw ApplicationFailure.newNonRetryableFailureWithCause(
-          String.format("Failed to commit dataset state for some dataset(s) of 
job %s", jobName),
+          String.format("Failed to commit dataset state for some dataset(s) of 
job %s", optJobName.orElse(UNDEFINED_JOB_NAME)),
           IOException.class.toString(),
-          new IOException(e),
-          null
+          new IOException(e)
       );
     } finally {
       String errCorrelator = String.format("Commit [%s]", 
calcCommitId(workSpec));
@@ -118,8 +110,13 @@ public class CommitActivityImpl implements CommitActivity {
    * @param jobContext
    * @throws IOException
    */
-  private void commitTaskStates(State jobState, Collection<TaskState> 
taskStates, JobContext jobContext) throws IOException {
-    Map<String, JobState.DatasetState> datasetStatesByUrns = 
createDatasetStatesByUrns(taskStates);
+  private void commitTaskStates(JobState jobState, List<TaskState> taskStates, 
JobContext jobContext) throws IOException {
+    if (!taskStates.isEmpty()) {
+      TaskState firstTaskState = taskStates.get(0);
+      log.info("TaskState (commit) [{}] (**first of {}**): {}", 
firstTaskState.getTaskId(), taskStates.size(), 
firstTaskState.toJsonString(true));
+    }
+    //TODO: handle skipped tasks?
+    Map<String, JobState.DatasetState> datasetStatesByUrns = 
jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
     final boolean shouldCommitDataInJob = 
JobContext.shouldCommitDataInJob(jobState);
     final DeliverySemantics deliverySemantics = 
DeliverySemantics.AT_LEAST_ONCE;
     //TODO: Make this configurable
@@ -148,7 +145,7 @@ public class CommitActivityImpl implements CommitActivity {
               }).iterator(), numCommitThreads,
           // TODO: Rewrite executorUtils to use java util optional
           
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), 
com.google.common.base.Optional.of("Commit-thread-%d")))
-          .executeAndGetResults();
+              .executeAndGetResults();
 
       IteratorExecutor.logFailures(result, null, 10);
 
@@ -166,7 +163,7 @@ public class CommitActivityImpl implements CommitActivity {
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
+        String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
@@ -174,36 +171,31 @@ public class CommitActivityImpl implements CommitActivity 
{
     }
   }
 
+  /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} 
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+  private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem 
fs, JobState jobState, int numThreads) throws IOException {
+    // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+    // NOTE: TaskState dir is assumed to be a sibling to the workunits dir 
(following conventions of `MRJobLauncher`)
+    String jobIdPathName = new 
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+    log.info("TaskStateStore path (name component): '{}' (fs: '{}')", 
jobIdPathName, fs.getUri());
+    Optional<Queue<TaskState>> taskStateQueueOpt = 
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobIdPathName, numThreads);
+    return taskStateQueueOpt.map(taskStateQueue ->
+        taskStateQueue.stream().peek(taskState ->
+                // CRITICAL: although some `WorkUnit`s, like those created by 
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+                // already themselves contain every prop of their `JobState`, 
not all do.
+                // `TaskState extends WorkUnit` serialization will include its 
constituent `WorkUnit`, but not the constituent `JobState`.
+                // given some `JobState` props may be essential for 
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+                taskState.setJobState(jobState)
+                // TODO - decide whether something akin necessary to 
streamline cumulative in-memory size of all issues: 
consumeTaskIssues(taskState);
+            ).collect(Collectors.toList())
+    ).orElseGet(() -> {
+      log.error("TaskStateStore successfully opened, but no task states found 
under (name) '{}'", jobIdPathName);
+      return Lists.newArrayList();
+    });
+  }
+
   /** @return id/correlator for this particular commit activity */
   private static String calcCommitId(WUProcessingSpec workSpec) {
     return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
   }
-
-  /**
-   * Organize task states by dataset urns.
-   * @param taskStates
-   * @return A map of dataset urns to dataset task states.
-   */
-  public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
-    Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-
-    //TODO: handle skipped tasks?
-    for (TaskState taskState : taskStates) {
-      String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
-      datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
-      datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
-    }
-
-    return datasetStatesByUrns;
-  }
-
-  private static String createDatasetUrn(Map<String, JobState.DatasetState> 
datasetStatesByUrns, TaskState taskState) {
-    String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN);
-    if (!datasetStatesByUrns.containsKey(datasetUrn)) {
-      JobState.DatasetState datasetState = new JobState.DatasetState();
-      datasetState.setDatasetUrn(datasetUrn);
-      datasetStatesByUrns.put(datasetUrn, datasetState);
-    }
-    return datasetUrn;
-  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index aff935dd8..f11ac7068 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -138,7 +138,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit 
{
         // TODO: if metrics configured, report them now
         log.info("WU [{} = {}] - finished commit after {}ms with state {}{}", 
wu.getCorrelator(), task.getTaskId(),
             taskState.getTaskDuration(), taskState.getWorkingState(),
-            
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
+            
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL) && 
taskState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)
                 ? (" to: " + 
taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
         log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(), 
task.getTaskId(),
             taskState.toJsonString(shouldUseExtendedLogging(wu)));
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index f855b19a9..5da320de9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -104,8 +104,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
           e.getClass().toString(),
-          e,
-          null
+          e
       );
     }
     return numWUsGenerated;

Reply via email to