[ 
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=893103&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893103
 ]

ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Nov/23 09:04
            Start Date: 30/Nov/23 09:04
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1410356037


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+  static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+  static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    // TODO: Make this configurable
+    int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+      JobContext globalGobblinContext = new 
JobContext(jobState.getProperties(), log, instanceBroker, null);
+      // TODO: Task state dir is a stub with the assumption it is always 
colocated with the workunits dir (as in the case of MR which generates 
workunits)
+      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
+      log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
+      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+      Collection<TaskState> taskStateQueue =
+          ImmutableList.copyOf(
+              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath, numDeserializationThreads));
+      commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+      return taskStateQueue.size();
+    } catch (Exception e) {
+      //TODO: IMPROVE GRANULARITY OF RETRIES
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          "Failed to commit dataset state for some dataset(s) of job 
<jobStub>",
+          IOException.class.toString(),
+          new IOException(e),
+          null
+      );
+    }
+  }
+
+  /**
+   * Commit task states to the dataset state store.
+   * @param jobState
+   * @param taskStates
+   * @param jobContext
+   * @throws IOException
+   */
+  void commitTaskStates(State jobState, Collection<TaskState> taskStates, 
JobContext jobContext) throws IOException {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = 
createDatasetStatesByUrns(taskStates);
+    final boolean shouldCommitDataInJob = 
JobContext.shouldCommitDataInJob(jobState);
+    final DeliverySemantics deliverySemantics = 
DeliverySemantics.AT_LEAST_ONCE;
+    //TODO: Make this configurable
+    final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+    if (!shouldCommitDataInJob) {
+      log.info("Job will not commit data since data are committed by tasks.");
+    }
+
+    try {
+      if (!datasetStatesByUrns.isEmpty()) {
+        log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+      }
+
+      List<Either<Void, ExecutionException>> result = new 
IteratorExecutor<>(Iterables
+          .transform(datasetStatesByUrns.entrySet(),
+              new Function<Map.Entry<String, JobState.DatasetState>, 
Callable<Void>>() {
+                @Nullable
+                @Override
+                public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry) {
+                  return new SafeDatasetCommit(shouldCommitDataInJob, false, 
deliverySemantics, entry.getKey(),
+                      entry.getValue(), false, jobContext);
+                }
+              }).iterator(), numCommitThreads,
+          ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("Commit-thread-%d")))
+          .executeAndGetResults();
+
+      IteratorExecutor.logFailures(result, null, 10);
+
+      if (!IteratorExecutor.verifyAllSuccessful(result)) {
+        // TODO: propagate cause of failure
+        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
"<job_name_stub>");
+        throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
+      }
+    } catch (InterruptedException exc) {
+      throw new IOException(exc);
+    }
+  }
+
+  /**
+   * Organize task states by dataset urns.
+   * @param taskStates
+   * @return

Review Comment:
   (describe)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 893103)
    Time Spent: 1h 10m  (was: 1h)

> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
>                 Key: GOBBLIN-1968
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to 
> replace MapReduce for many of its data movement workflows. We want to 
> integrate Gobblin's commit step that tracks task states and reports their 
> status with Temporal



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to