[ 
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=893100&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893100
 ]

ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Nov/23 09:02
            Start Date: 30/Nov/23 09:02
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1410345417


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
    * @throws IOException if it fails to collect the output {@link TaskState}s
    */
   private void collectOutputTaskStates() throws IOException {
-    List<String> taskStateNames = 
taskStateStore.getTableNames(outputTaskStateDir.getName(), new 
Predicate<String>() {
-      @Override
-      public boolean apply(String input) {
-        return 
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
-        && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
-      }});
 
-    if (taskStateNames == null || taskStateNames.size() == 0) {
-      LOGGER.debug("No output task state files found in " + 
this.outputTaskStateDir);
+    final Queue<TaskState> taskStateQueue = 
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, 
this.stateSerDeRunnerThreads);
+    if (taskStateQueue == null) {

Review Comment:
   I'd lean heavily toward returning an empty queue rather than `null`
   
   if the issue is that the queue may be built asynchronously, such that 
".isEmpty()" at the start might not always be the case... in that case, use 
`Optional`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+  static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+  static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    // TODO: Make this configurable
+    int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+      JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, null);
+      // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
+      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
+      log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
+      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+      Collection<TaskState> taskStateQueue =
+          ImmutableList.copyOf(
+              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath, numDeserializationThreads));
+      commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+      return taskStateQueue.size();
+    } catch (Exception e) {
+      //TODO: IMPROVE GRANULARITY OF RETRIES
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          "Failed to commit dataset state for some dataset(s) of job 
<jobStub>",
+          IOException.class.toString(),
+          new IOException(e),
+          null
+      );
+    }
+  }
+
+  /**
+   * Commit task states to the dataset state store.
+   * @param jobState
+   * @param taskStates
+   * @param jobContext
+   * @throws IOException
+   */
+  void commitTaskStates(State jobState, Collection<TaskState> taskStates, 
JobContext jobContext) throws IOException {

Review Comment:
   `protected`, or perhaps `private`?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+  static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+  static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    // TODO: Make this configurable
+    int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+      JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, null);
+      // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
+      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
+      log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
+      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+      Collection<TaskState> taskStateQueue =
+          ImmutableList.copyOf(
+              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath, numDeserializationThreads));
+      commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+      return taskStateQueue.size();
+    } catch (Exception e) {
+      //TODO: IMPROVE GRANULARITY OF RETRIES
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          "Failed to commit dataset state for some dataset(s) of job 
<jobStub>",
+          IOException.class.toString(),
+          new IOException(e),
+          null
+      );
+    }
+  }
+
+  /**
+   * Commit task states to the dataset state store.
+   * @param jobState
+   * @param taskStates
+   * @param jobContext
+   * @throws IOException
+   */
+  void commitTaskStates(State jobState, Collection<TaskState> taskStates, 
JobContext jobContext) throws IOException {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = 
createDatasetStatesByUrns(taskStates);
+    final boolean shouldCommitDataInJob = 
JobContext.shouldCommitDataInJob(jobState);
+    final DeliverySemantics deliverySemantics = 
DeliverySemantics.AT_LEAST_ONCE;
+    //TODO: Make this configurable
+    final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+    if (!shouldCommitDataInJob) {
+      log.info("Job will not commit data since data are committed by tasks.");
+    }
+
+    try {
+      if (!datasetStatesByUrns.isEmpty()) {
+        log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+      }
+
+      List<Either<Void, ExecutionException>> result = new 
IteratorExecutor<>(Iterables
+          .transform(datasetStatesByUrns.entrySet(),
+              new Function<Map.Entry<String, JobState.DatasetState>, 
Callable<Void>>() {
+                @Nullable
+                @Override
+                public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry) {
+                  return new SafeDatasetCommit(shouldCommitDataInJob, false, 
deliverySemantics, entry.getKey(),
+                      entry.getValue(), false, jobContext);
+                }
+              }).iterator(), numCommitThreads,
+          ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("Commit-thread-%d")))
+          .executeAndGetResults();
+
+      IteratorExecutor.logFailures(result, null, 10);
+
+      if (!IteratorExecutor.verifyAllSuccessful(result)) {
+        // TODO: propagate cause of failure
+        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
"<job_name_stub>");
+        throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
+      }
+    } catch (InterruptedException exc) {
+      throw new IOException(exc);
+    }
+  }
+
+  /**
+   * Organize task states by dataset urns.
+   * @param taskStates
+   * @return
+   */
+  public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+    //TODO: handle skipped tasks?

Review Comment:
   I'm not clear on which ones might be skipped, so please consider elaborating 
in the comment



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    int numDeserializationThreads = 1;
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
createDefaultInstanceBroker(jobState.getProperties());
+      JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, null);
+      // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
+      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
+      log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
+      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+      Collection<TaskState> taskStateQueue =
+          ImmutableList.copyOf(
+              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath, numDeserializationThreads));
+      commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+    } catch (Exception e) {
+      //TODO: IMPROVE GRANULARITY OF RETRIES
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          "Failed to commit dataset state for some dataset(s) of job 
<jobStub>",
+          IOException.class.toString(),
+          new IOException(e),
+          null
+      );
+    }
+
+    return 0;
+  }
+
+  protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig) 
throws IOException {
+    return HadoopUtils.getFileSystem(fsUri, stateConfig);
+  }
+
+  void commitTaskStates(State jobState, Collection<TaskState> taskStates, 
JobContext jobContext) throws IOException {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = 
createDatasetStatesByUrns(taskStates);
+    final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+    final DeliverySemantics deliverySemantics = 
DeliverySemantics.AT_LEAST_ONCE;
+    final int numCommitThreads = 1;
+
+    if (!shouldCommitDataInJob) {
+      log.info("Job will not commit data since data are committed by tasks.");

Review Comment:
   I agree and understand... yet was also noting the non-specificity because 
it's not only based on the configured commit policy, but also when 
`ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE` may have been forgotten



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,52 @@ public Void call() throws Exception {
     this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Reads in a {@link FsStateStore} folder used to store Task state outputs, 
and returns a queue of {@link TaskState}s
+   * Task State files are populated by the {@link GobblinMultiTaskAttempt} to 
record the output of remote concurrent tasks (e.g. MR mappers)
+   * @param taskStateStore
+   * @param outputTaskStateDir
+   * @param numDeserializerThreads
+   * @return Queue of TaskStates

Review Comment:
   ...or `null` apparently (which is very much worth documenting here!)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java:
##########
@@ -17,19 +17,20 @@
 
 package org.apache.gobblin.temporal.ddm.workflow;
 
-import io.temporal.workflow.Promise;
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
-import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 
 
+/**
+ * Workflow for committing the output of work done by {@link 
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ */
 @WorkflowInterface
 public interface CommitStepWorkflow {
 
     /**
-     * This is the method that is executed when the Workflow Execution is 
started. The Workflow
-     * Execution completes when this method finishes execution.
+     * Commit the output of the work done by {@link 
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+     * Returns the number of workunits committed

Review Comment:
   nit: `@return`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -23,30 +23,40 @@
 import io.temporal.api.enums.v1.ParentClosePolicy;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
 import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
 import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
 
 
+@Slf4j
 public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
   public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
+  public static final String COMMIT_STEP_WORKFLOW_ID_BASE = 
"CommitStepWorkflow";
 
   @Override
   public int process(WUProcessingSpec workSpec) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
-    return processingWorkflow.performWorkload(
+    int workunitsProcessed = processingWorkflow.performWorkload(
         WorkflowAddr.ROOT, workload, 0,
         workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
     );
+    if (workunitsProcessed > 0) {
+      CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
+      int result = commitWorkflow.commit(workSpec);
+      return result;
+    }
+    return workunitsProcessed;

Review Comment:
   are we certain `commitWorkflow.commit()` would always return > 0?  if not, 
let's log when we started w/ `workunitsProcessed == 0` vs. when `result == 0`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+  static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+  static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    // TODO: Make this configurable
+    int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+      JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, null);
+      // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
+      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
+      log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
+      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+      Collection<TaskState> taskStateQueue =
+          ImmutableList.copyOf(
+              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath, numDeserializationThreads));
+      commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+      return taskStateQueue.size();
+    } catch (Exception e) {
+      //TODO: IMPROVE GRANULARITY OF RETRIES
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          "Failed to commit dataset state for some dataset(s) of job 
<jobStub>",
+          IOException.class.toString(),
+          new IOException(e),
+          null
+      );
+    }
+  }
+
+  /**
+   * Commit task states to the dataset state store.
+   * @param jobState
+   * @param taskStates
+   * @param jobContext
+   * @throws IOException
+   */
+  void commitTaskStates(State jobState, Collection<TaskState> taskStates, 
JobContext jobContext) throws IOException {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = 
createDatasetStatesByUrns(taskStates);
+    final boolean shouldCommitDataInJob = 
JobContext.shouldCommitDataInJob(jobState);
+    final DeliverySemantics deliverySemantics = 
DeliverySemantics.AT_LEAST_ONCE;
+    //TODO: Make this configurable
+    final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+    if (!shouldCommitDataInJob) {
+      log.info("Job will not commit data since data are committed by tasks.");
+    }
+
+    try {
+      if (!datasetStatesByUrns.isEmpty()) {
+        log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+      }
+
+      List<Either<Void, ExecutionException>> result = new 
IteratorExecutor<>(Iterables
+          .transform(datasetStatesByUrns.entrySet(),
+              new Function<Map.Entry<String, JobState.DatasetState>, 
Callable<Void>>() {
+                @Nullable
+                @Override
+                public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry) {
+                  return new SafeDatasetCommit(shouldCommitDataInJob, false, 
deliverySemantics, entry.getKey(),
+                      entry.getValue(), false, jobContext);
+                }
+              }).iterator(), numCommitThreads,
+          ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("Commit-thread-%d")))
+          .executeAndGetResults();
+
+      IteratorExecutor.logFailures(result, null, 10);
+
+      if (!IteratorExecutor.verifyAllSuccessful(result)) {
+        // TODO: propagate cause of failure
+        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
"<job_name_stub>");
+        throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
+      }
+    } catch (InterruptedException exc) {
+      throw new IOException(exc);
+    }
+  }
+
+  /**
+   * Organize task states by dataset urns.
+   * @param taskStates
+   * @return

Review Comment:
   fill in



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    int numDeserializationThreads = 1;
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
createDefaultInstanceBroker(jobState.getProperties());
+      JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, null);
+      // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
+      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
+      log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
+      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+      Collection<TaskState> taskStateQueue =
+          ImmutableList.copyOf(
+              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath, numDeserializationThreads));
+      commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+
+    } catch (Exception e) {
+      //TODO: IMPROVE GRANULARITY OF RETRIES
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          "Failed to commit dataset state for some dataset(s) of job 
<jobStub>",
+          IOException.class.toString(),
+          new IOException(e),
+          null
+      );
+    }
+
+    return 0;
+  }
+
+  protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig) 
throws IOException {
+    return HadoopUtils.getFileSystem(fsUri, stateConfig);
+  }
+
+  void commitTaskStates(State jobState, Collection<TaskState> taskStates, 
JobContext jobContext) throws IOException {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = 
createDatasetStatesByUrns(taskStates);
+    final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState);
+    final DeliverySemantics deliverySemantics = 
DeliverySemantics.AT_LEAST_ONCE;
+    final int numCommitThreads = 1;
+
+    if (!shouldCommitDataInJob) {
+      log.info("Job will not commit data since data are committed by tasks.");
+    }
+
+    try {
+      if (!datasetStatesByUrns.isEmpty()) {
+        log.info("Persisting dataset urns.");
+      }
+
+      List<Either<Void, ExecutionException>> result = new 
IteratorExecutor<>(Iterables
+          .transform(datasetStatesByUrns.entrySet(),
+              new Function<Map.Entry<String, JobState.DatasetState>, 
Callable<Void>>() {
+                @Nullable
+                @Override
+                public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry) {
+                  return new SafeDatasetCommit(shouldCommitDataInJob, false, 
deliverySemantics, entry.getKey(),
+                      entry.getValue(), false, jobContext);
+                }
+              }).iterator(), numCommitThreads,
+          ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("Commit-thread-%d")))
+          .executeAndGetResults();
+
+      IteratorExecutor.logFailures(result, null, 10);
+
+      if (!IteratorExecutor.verifyAllSuccessful(result)) {
+        // TODO: propagate cause of failure
+        throw new IOException("Failed to commit dataset state for some 
dataset(s) of job <jobStub>");
+      }
+    } catch (InterruptedException exc) {
+      throw new IOException(exc);
+    }
+  }
+
+  public Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+    //TODO: handle skipped tasks?
+    for (TaskState taskState : taskStates) {
+      String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+      datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+      datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+    }
+
+    return datasetStatesByUrns;
+  }
+
+  private String createDatasetUrn(Map<String, JobState.DatasetState> 
datasetStatesByUrns, TaskState taskState) {
+    String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN);
+    if (!datasetStatesByUrns.containsKey(datasetUrn)) {
+      JobState.DatasetState datasetState = new JobState.DatasetState();
+      datasetState.setDatasetUrn(datasetUrn);
+      datasetStatesByUrns.put(datasetUrn, datasetState);
+    }
+    return datasetUrn;
+  }
+
+  private static boolean shouldCommitDataInJob(State state) {
+    boolean jobCommitPolicyIsFull =
+        JobCommitPolicy.getCommitPolicy(state.getProperties()) == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+    boolean publishDataAtJobLevel = 
state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
+        ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
+    boolean jobDataPublisherSpecified =
+        
!Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE));
+    return jobCommitPolicyIsFull || publishDataAtJobLevel || 
jobDataPublisherSpecified;
+  }
+
+  private static SharedResourcesBroker<GobblinScopeTypes> 
createDefaultInstanceBroker(Properties jobProps) {

Review Comment:
   great!  this method isn't used then, is it?  if not, let's remove



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -259,6 +227,52 @@ public Void call() throws Exception {
     this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Reads in a {@link FsStateStore} folder used to store Task state outputs, 
and returns a queue of {@link TaskState}s

Review Comment:
   it's not truly required to be FS-backed, is it?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 893100)
    Time Spent: 40m  (was: 0.5h)

> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
>                 Key: GOBBLIN-1968
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to 
> replace MapReduce for many of its data movement workflows. We want to 
> integrate Gobblin's commit step that tracks task states and reports their 
> status with Temporal



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Reply via email to