This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fee5a0e74 [GOBBLIN-2010] Implement Distributed Data Movement (DDM) 
Gobblin-on-Temporal end-to-end workflow for arbitrary job config (#3886)
fee5a0e74 is described below

commit fee5a0e747812b7676f0be1fef357fbc9d3e0502
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Mar 1 17:00:53 2024 -0800

    [GOBBLIN-2010] Implement Distributed Data Movement (DDM) 
Gobblin-on-Temporal end-to-end workflow for arbitrary job config (#3886)
    
    Implement Distributed Data Movement (DDM) Gobblin-on-Temporal end-to-end 
workflow for any arbitrary job config
---
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |   5 -
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     |  65 ++++++------
 .../ddm/launcher/ExecuteGobblinJobLauncher.java    |  96 +++++++++++++++++
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |  15 ++-
 .../gobblin/temporal/ddm/util/JobStateUtils.java   |  30 +++++-
 .../ddm/work/PriorJobStateWUProcessingSpec.java    | 117 +++++++++++++++++++++
 .../temporal/ddm/work/WUProcessingSpec.java        |  21 ++--
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |   4 +-
 .../workflow/ExecuteGobblinWorkflow.java}          |  29 +++--
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  | 111 +++++++++++++++++++
 .../NestingExecOfProcessWorkUnitWorkflowImpl.java  |   3 +-
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  89 +++-------------
 .../workflows/metrics/SubmitGTEActivityImpl.java   |  17 ++-
 13 files changed, 460 insertions(+), 142 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 797ba7951..2cb95a5d8 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -19,12 +19,10 @@ package org.apache.gobblin.temporal.ddm.activity.impl;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import com.google.api.client.util.Lists;
 import com.google.common.io.Closer;
-import com.typesafe.config.Config;
 
 import io.temporal.failure.ApplicationFailure;
 
@@ -43,7 +41,6 @@ import org.apache.gobblin.source.WorkUnitStreamSource;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -59,8 +56,6 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     // TODO: provide for job cancellation (unless handling at the 
temporal-level of parent workflows)!
     JobState jobState = new JobState(jobProps);
     log.info("Created jobState: {}", jobState.toJsonString(true));
-    Optional<Config> thisClassConfig = WorkerConfig.of(this);
-    log.info("Obtained class config: {}", thisClassConfig.isPresent() ? 
thisClassConfig.get() : "NO WORKER CONFIG: ERROR!");
 
     Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
     log.info("Using work dir root path for job '{}' - '{}'", 
jobState.getJobId(), workDirRoot);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index 640d78da1..1aebf4318 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -44,7 +44,6 @@ import org.apache.gobblin.runtime.AbstractTaskStateTracker;
 import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.Task;
-import org.apache.gobblin.runtime.TaskCreationException;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateTracker;
@@ -62,11 +61,14 @@ import org.apache.gobblin.util.JobLauncherUtils;
 public class ProcessWorkUnitImpl implements ProcessWorkUnit {
   private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100;
 
+  private static final String MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT = 
ProcessWorkUnitImpl.class.getName() + ".maxSourcePathsToLogPerMultiWorkUnit";
+  private static final int DEFAULT_MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT 
= 5;
+
   @Override
   public int processWorkUnit(WorkUnitClaimCheck wu) {
     try (FileSystem fs = Help.loadFileSystemForce(wu)) {
       List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
-      log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(), 
workUnits.size());
+      log.info("(M)WU [{}] - loaded; found {} workUnits", wu.getCorrelator(), 
workUnits.size());
       JobState jobState = Help.loadJobState(wu, fs);
       return execute(workUnits, wu, jobState, fs);
     } catch (IOException | InterruptedException e) {
@@ -99,32 +101,21 @@ public class ProcessWorkUnitImpl implements 
ProcessWorkUnit {
     troubleshooter.start();
 
     List<String> fileSourcePaths = workUnits.stream()
-        .map(workUnit -> describeAsCopyableFile(workUnit, 
wu.getWorkUnitPath()))
+        .map(workUnit -> getCopyableFileSourcePathDesc(workUnit, 
wu.getWorkUnitPath()))
         .collect(Collectors.toList());
-    log.info("WU [{}] - submitting {} workUnits for copying files: {}", 
wu.getCorrelator(),
-        workUnits.size(), fileSourcePaths);
+    List<String> pathsToLog = getSourcePathsToLog(fileSourcePaths, jobState);
+    log.info("WU [{}] - submitting {} workUnits for copying source files: 
{}{}",
+        wu.getCorrelator(),
+        workUnits.size(),
+        pathsToLog.size() == workUnits.size() ? "" : ("**first " + 
pathsToLog.size() + " only** "),
+        pathsToLog);
     log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(), 
workUnits.get(0).toJsonString());
 
-    try {
-      GobblinMultiTaskAttempt taskAttempt = 
GobblinMultiTaskAttempt.runWorkUnits(
-          jobState.getJobId(), containerId, jobState, workUnits,
-          taskStateTracker, taskExecutor, taskStateStore, 
multiTaskAttemptCommitPolicy,
-          resourcesBroker, troubleshooter.getIssueRepository(), 
createInterruptionPredicate(fs, jobState));
-      return taskAttempt.getNumTasksCreated();
-    } catch (TaskCreationException tce) { // derived type of `IOException` 
that ought not be caught!
-      throw tce;
-    } catch (IOException ioe) {
-      // presume execution already occurred, with `TaskState` written to 
reflect outcome
-      log.warn("WU [" + wu.getCorrelator() + "] - continuing on despite 
IOException:", ioe);
-      return 0;
-    }
-  }
-
-  /** Demonstration processing, to isolate debugging of WU loading and 
deserialization */
-  protected int countSumProperties(List<WorkUnit> workUnits, 
WorkUnitClaimCheck wu) {
-    int totalNumProps = workUnits.stream().mapToInt(workUnit -> 
workUnit.getPropertyNames().size()).sum();
-    log.info("opened WU [{}] to find {} properties total at '{}'", 
wu.getCorrelator(), totalNumProps, wu.getWorkUnitPath());
-    return totalNumProps;
+    GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits(
+        jobState.getJobId(), containerId, jobState, workUnits,
+        taskStateTracker, taskExecutor, taskStateStore, 
multiTaskAttemptCommitPolicy,
+        resourcesBroker, troubleshooter.getIssueRepository(), 
createInterruptionPredicate(fs, jobState));
+    return taskAttempt.getNumTasksCreated();
   }
 
   protected TaskStateTracker 
createEssentializedTaskStateTracker(WorkUnitClaimCheck wu) {
@@ -157,7 +148,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit 
{
     };
   }
 
-  protected String describeAsCopyableFile(WorkUnit workUnit, String 
workUnitPath) {
+  protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String 
workUnitPath) {
     return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
         .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
         .orElse(
@@ -173,12 +164,6 @@ public class ProcessWorkUnitImpl implements 
ProcessWorkUnit {
     return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
   }
 
-  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
-    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
-      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
-    );
-  }
-
   protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
     return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
       log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
@@ -192,6 +177,12 @@ public class ProcessWorkUnitImpl implements 
ProcessWorkUnit {
     });
   }
 
+  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
+    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
+      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
+    );
+  }
+
   protected Optional<Class<?>> getOptCopyEntityClass(State state, String 
logDesc) {
     try {
       return Optional.of(CopySource.getCopyEntityClass(state));
@@ -225,4 +216,14 @@ public class ProcessWorkUnitImpl implements 
ProcessWorkUnit {
       return false;
     }
   }
+
+  private static List<String> getSourcePathsToLog(List<String> sourcePaths, 
JobState jobState) {
+    int maxPathsToLog = getMaxSourcePathsToLogPerMultiWorkUnit(jobState);
+    int numPathsToLog = Math.min(sourcePaths.size(), maxPathsToLog);
+    return sourcePaths.subList(0, numPathsToLog);
+  }
+
+  private static int getMaxSourcePathsToLogPerMultiWorkUnit(State jobState) {
+    return jobState.getPropAsInt(MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT, 
DEFAULT_MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT);
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
new file mode 100644
index 000000000..7d4fcb295
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.launcher;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.client.WorkflowOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow 
that executes a full Gobblin job workflow of:
+ *   * Work Discovery (via an arbitrary and configurable {@link 
org.apache.gobblin.source.Source})
+ *   * Work Fulfillment/Processing
+ *   * Commit
+ *
+ *  see: {@link ExecuteGobblinWorkflow} *
+ *
+ * <p>
+ *   This class is instantiated by the {@link 
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job 
submission to launch the Gobblin job.
+ *   The actual task execution happens in the {@link 
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Slf4j
+public class ExecuteGobblinJobLauncher extends GobblinTemporalJobLauncher {
+
+  public static final String WORKFLOW_ID_BASE = "ExecuteGobblin";
+
+  public ExecuteGobblinJobLauncher(
+      Properties jobProps,
+      Path appWorkDir,
+      List<? extends Tag<?>> metadataTags,
+      ConcurrentHashMap<String, Boolean> runningMap,
+      EventBus eventBus
+  ) throws Exception {
+    super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
+  }
+
+  @Override
+  public void submitJob(List<WorkUnit> workunits) {
+    try {
+      WorkflowOptions options = WorkflowOptions.newBuilder()
+          .setTaskQueue(this.queueName)
+          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps)))
+          .build();
+      ExecuteGobblinWorkflow workflow = 
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
+
+      Config jobConfigWithOverrides = 
applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));
+
+      Help.propagateGaaSFlowExecutionContext(this.jobProps);
+
+      EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext(this.eventSubmitter);
+
+      int numWorkUnits = 
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
+      log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 006572dc0..d943013e5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -22,14 +22,15 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.hadoop.fs.Path;
-
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.client.WorkflowOptions;
+
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.hadoop.fs.Path;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.Tag;
@@ -37,11 +38,13 @@ import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.PriorJobStateWUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.util.PropertiesUtils;
 
 import static 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
@@ -82,15 +85,14 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
       URI nameNodeUri = new URI(PropertiesUtils.getRequiredProp(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI));
       // NOTE: `Path` is challenging for temporal to ser/de, but nonetheless 
do pre-construct as `Path`, to pre-validate this prop string's contents
       Path workUnitsDir = new 
Path(PropertiesUtils.getRequiredProp(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR));
-      WUProcessingSpec wuSpec = new WUProcessingSpec(nameNodeUri, 
workUnitsDir.toString());
+      EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext(this.eventSubmitter);
+      PriorJobStateWUProcessingSpec wuSpec = new 
PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(), 
eventSubmitterContext);
       if 
(this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
           
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
         int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
         int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
         wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
       }
-      Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, 
Help.loadFileSystem(wuSpec)));
-
       wuSpec.setTags(GobblinMetrics.getCustomTagsFromState(new 
State(jobProps)));
       wuSpec.setMetricsSuffix(this.jobProps.getProperty(
           GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
@@ -100,6 +102,9 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
           .setTaskQueue(this.queueName)
           
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
           .build();
+
+      Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, 
Help.loadFileSystem(wuSpec)));
+
       ProcessWorkUnitsWorkflow workflow = 
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
       workflow.process(wuSpec);
     } catch (Exception e) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index ae6a8b711..37d1f9047 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -73,8 +73,7 @@ public class JobStateUtils {
 
   /** @return the {@link FileSystem} indicated by {@link 
ConfigurationKeys#FS_URI_KEY} */
   public static FileSystem openFileSystem(JobState jobState) throws 
IOException {
-    URI fsUri = URI.create(jobState.getProp(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
-    return Help.loadFileSystemForUriForce(fsUri, jobState);
+    return Help.loadFileSystemForUriForce(getFileSystemUri(jobState), 
jobState);
   }
 
   /** @return a new instance of {@link Source} identified by {@link 
ConfigurationKeys#SOURCE_CLASS_KEY} */
@@ -104,10 +103,15 @@ public class JobStateUtils {
     return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(), 
TaskState.class);
   }
 
+  /** @return the {@link URI} indicated by {@link 
ConfigurationKeys#FS_URI_KEY} */
+  public static URI getFileSystemUri(JobState jobState) {
+    return URI.create(jobState.getProp(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
+  }
+
   /**
    * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
    * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
-   * @return "base" dir root path for work dir (parent of inputs, output task 
states, etc.)
+   * @return "base" dir root {@link Path} for work dir (parent of inputs, 
output task states, etc.)
    */
   public static Path getWorkDirRoot(JobState jobState) {
     return new Path(
@@ -118,7 +122,23 @@ public class JobStateUtils {
   /**
    * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
    * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
-   * @return path to {@link FsStateStore<TaskState>} backing dir
+   * @return {@link Path} where "input" {@link WorkUnit}s should reside
+   */
+  public static Path getWorkUnitsPath(JobState jobState) {
+    return getWorkUnitsPath(getWorkDirRoot(jobState));
+  }
+
+  /**
+   * @return {@link Path} where "input" {@link WorkUnit}s should reside
+   */
+  public static Path getWorkUnitsPath(Path workDirRoot) {
+    return new Path(workDirRoot, INPUT_DIR_NAME);
+  }
+
+  /**
+   * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+   * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+   * @return {@link Path} to {@link FsStateStore<TaskState>} backing dir
    */
   public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
     Path jobOutputPath = new Path(getWorkDirRoot(jobState), OUTPUT_DIR_NAME);
@@ -129,7 +149,7 @@ public class JobStateUtils {
   public static void writeWorkUnits(List<WorkUnit> workUnits, Path 
workDirRootPath, JobState jobState, FileSystem fs)
       throws IOException {
     String jobId = jobState.getJobId();
-    Path targetDirPath = new Path(workDirRootPath, INPUT_DIR_NAME);
+    Path targetDirPath = getWorkUnitsPath(workDirRootPath);
 
     int numThreads = 
ParallelRunner.getNumThreadsConfig(jobState.getProperties());
     Closer closer = Closer.create(); // (NOTE: try-with-resources syntax 
wouldn't allow `catch { closer.rethrow(t) }`)
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
new file mode 100644
index 000000000..98c375564
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/**
+ * Same as {@link WUProcessingSpec}, but for a "Work Fulfillment-only" 
workflow that leverages the {@link JobState} and
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s previously persisted 
by another separate job execution.
+ * Accordingly we wish to adjust/"spoof" our {@link EventSubmitterContext} to 
carry identifiers from that original job,
+ * and to indicate that the processing workflow ought to perform job-level 
timing.
+ */
+@Data
+@EqualsAndHashCode(callSuper = true) // to prevent findbugs warning - "equals 
method overrides equals in superclass and may not be symmetric"
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class PriorJobStateWUProcessingSpec extends WUProcessingSpec {
+  @NonNull
+  private List<Tag<?>> tags = new ArrayList<>();
+  @NonNull private String metricsSuffix = 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
+
+  public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, 
EventSubmitterContext eventSubmitterContext) {
+    super(fileSystemUri, workUnitsDir, eventSubmitterContext);
+  }
+
+  @Override
+  public boolean isToDoJobLevelTiming() {
+    return true;
+  }
+
+  @Override
+  public @NonNull EventSubmitterContext getEventSubmitterContext() {
+    // NOTE: We are using the metrics tags from Job Props to create the metric 
context for the timer and NOT
+    // the deserialized jobState from HDFS that is created by the real distcp 
job. This is because the AZ runtime
+    // settings we want are for the job launcher that launched this Yarn job.
+    try {
+      FileSystem fs = Help.loadFileSystemForce(this);
+      JobState jobState = Help.loadJobStateUncached(this, fs);
+      List<Tag<?>> tagsFromCurrentJob = this.getTags();
+      String metricsSuffix = this.getMetricsSuffix();
+      List<Tag<?>> tags = this.calcMergedTags(tagsFromCurrentJob, 
metricsSuffix, jobState);
+      return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private List<Tag<?>> calcMergedTags(List<Tag<?>> tagsFromCurJob, String 
metricsSuffix, JobState jobStateFromHdfs) {
+    // Construct new tags list by combining subset of tags on HDFS job state 
and the rest of the fields from the current job
+    Map<String, Tag<?>> tagsMap = new HashMap<>();
+    Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+        TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+        TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+    // Step 1, Add tags from the AZ props using the original job (the one that 
launched this yarn app)
+    tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+    // Step 2. Add tags from the jobState (the original MR job on HDFS)
+    List<String> targetKeysToAddSuffix = 
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+        .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+        .forEach(tag -> {
+          // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND 
FLOW_GROUP_FIELDS to prevent collisions when testing
+          String value = targetKeysToAddSuffix.contains(tag.getKey())
+              ? tag.getValue() + metricsSuffix
+              : String.valueOf(tag.getValue());
+          tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+        });
+
+    // Step 3: Overwrite any pre-existing metadata with name of the current 
caller
+    tagsMap.put(GobblinMetricsKeys.CLASS_META, new 
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+    return new ArrayList<>(tagsMap.values());
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 5e3ba3dd1..30e67eb34 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,42 +18,45 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
 
-import org.apache.hadoop.fs.Path;
-
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
+import org.apache.hadoop.fs.Path;
+
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
 /**
  * Intended to reference multiple {@link 
org.apache.gobblin.source.workunit.WorkUnit}s to process, where `workUnitsDir`
- * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by 
`nameNodeUri`.  see:
+ * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by 
`nameNodeUri`
  */
 @Data
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, 
property = "@class") // to handle extensions
 public class WUProcessingSpec implements FileSystemApt, FileSystemJobStateful {
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitsDir;
+  @NonNull private EventSubmitterContext eventSubmitterContext;
   @NonNull private Tuning tuning = Tuning.DEFAULT;
-  @NonNull private List<Tag<?>> tags = new ArrayList<>();
-  @NonNull private String metricsSuffix = 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
+
+  /** whether to conduct job-level timing (and send results via GTE) */
+  public boolean isToDoJobLevelTiming() {
+    return false;
+  }
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 02631719c..5256b8b2b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -28,6 +28,7 @@ import 
org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.ExecuteGobblinWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
@@ -45,7 +46,8 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
 
     @Override
     protected Class<?>[] getWorkflowImplClasses() {
-        return new Class[] { CommitStepWorkflowImpl.class, 
GenerateWorkUnitsWorkflowImpl.class, 
NestingExecOfProcessWorkUnitWorkflowImpl.class, 
ProcessWorkUnitsWorkflowImpl.class };
+        return new Class[] { CommitStepWorkflowImpl.class, 
ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class,
+            NestingExecOfProcessWorkUnitWorkflowImpl.class, 
ProcessWorkUnitsWorkflowImpl.class };
     }
 
     @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
similarity index 51%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
index 63bce421a..d764f3742 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
@@ -15,20 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.workflows.metrics;
+package org.apache.gobblin.temporal.ddm.workflow;
 
-import org.slf4j.Logger;
+import java.util.Properties;
 
-import io.temporal.workflow.Workflow;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
 
-import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
-public class SubmitGTEActivityImpl implements SubmitGTEActivity {
-    private static Logger log = 
Workflow.getLogger(SubmitGTEActivityImpl.class);
-
-    @Override
-    public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
-        eventSubmitterContext.create().submit(eventBuilder);
-    }
+/**
+ *  Workflow for executing an end-to-end Gobblin job, including:
+ *   * Work Discovery (via an arbitrary and configurable {@link 
org.apache.gobblin.source.Source})
+ *   * Work Fulfillment/Processing
+ *   * Commit
+ *
+ */
+@WorkflowInterface
+public interface ExecuteGobblinWorkflow {
+  /** @return the number of {@link WorkUnit}s discovered and successfully 
processed */
+  @WorkflowMethod
+  int execute(Properties props, EventSubmitterContext eventSubmitterContext);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
new file mode 100644
index 000000000..39f401a05
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Properties;
+
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Workflow;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
+import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+@Slf4j
+public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
+  public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits";
+
+  public static final Duration genWUsStartToCloseTimeout = 
Duration.ofMinutes(90); // TODO: make configurable
+
+  private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
+      .setStartToCloseTimeout(genWUsStartToCloseTimeout)
+      .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
+      .build();
+
+  private final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class,
+      GEN_WUS_ACTIVITY_OPTS);
+
+  @Override
+  public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
+    EventTimer timer = timerFactory.createJobTimer();
+
+    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+    if (numWUsGenerated > 0) {
+      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
+
+      JobState jobState = new JobState(jobProps);
+      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
+          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+      }
+
+      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+      if (numWUsProcessed != numWUsGenerated) {
+        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
+        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+      }
+    }
+    timer.stop();
+    return numWUsGenerated;
+
+  }
+
+  protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow() {
+    ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        .build();
+    return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
index 074bb4609..c9b330255 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWork
 
 /** {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for 
{@link ProcessWorkUnit} */
 public class NestingExecOfProcessWorkUnitWorkflowImpl extends 
AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> {
+  public static final Duration processWorkUnitStartToCloseTimeout = 
Duration.ofMinutes(20); // TODO: make configurable
 
   // RetryOptions specify how to automatically handle retries when Activities 
fail.
   private static final RetryOptions ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
@@ -41,7 +42,7 @@ public class NestingExecOfProcessWorkUnitWorkflowImpl extends 
AbstractNestingExe
       .build();
 
   private static final ActivityOptions ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofSeconds(999))
+      .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout)
       .setRetryOptions(ACTIVITY_RETRY_OPTS)
       .build();
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 679a39dae..411a4b6a0 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,32 +16,16 @@
  */
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.jetbrains.annotations.NotNull;
 
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.api.enums.v1.ParentClosePolicy;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
+
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.instrumented.GobblinMetricsKeys;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -65,29 +49,10 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
 
   @Override
   public int process(WUProcessingSpec workSpec) {
-    try {
-      EventSubmitterContext eventSubmitterContext = 
getEventSubmitterContext(workSpec);
-      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
-      try (EventTimer timer = timerFactory.createJobTimer()) {
-        return performWork(workSpec);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @NotNull
-  private EventSubmitterContext getEventSubmitterContext(WUProcessingSpec 
workSpec)
-      throws IOException {
-    // NOTE: We are using the metrics tags from Job Props to create the metric 
context for the timer and NOT
-    // the deserialized jobState from HDFS that is created by the real distcp 
job. This is because the AZ runtime
-    // settings we want are for the job launcher that launched this Yarn job.
-    FileSystem fs = Help.loadFileSystemForce(workSpec);
-    JobState jobState = Help.loadJobStateUncached(workSpec, fs);
-    List<Tag<?>> tagsFromCurrentJob = workSpec.getTags();
-    String metricsSuffix = workSpec.getMetricsSuffix();
-    List<Tag<?>> tags = getTags(tagsFromCurrentJob, metricsSuffix, jobState);
-    return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+    Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
+    int result = performWork(workSpec);
+    timer.ifPresent(EventTimer::stop);
+    return result;
   }
 
   private int performWork(WUProcessingSpec workSpec) {
@@ -101,7 +66,7 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
       CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
       int result = commitWorkflow.commit(workSpec);
       if (result == 0) {
-        log.warn("No work units committed at the job level. They could be 
committed at a task level.");
+        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
       }
       return result;
     } else {
@@ -110,13 +75,23 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
     }
   }
 
+  private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec 
workSpec) {
+    if (workSpec.isToDoJobLevelTiming()) {
+      EventSubmitterContext eventSubmitterContext = 
workSpec.getEventSubmitterContext();
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
+      return Optional.of(timerFactory.createJobTimer());
+    } else {
+      return Optional.empty();
+    }
+  }
+
   protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec 
workSpec) {
     return new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(), 
workSpec.getWorkUnitsDir());
   }
 
   protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f) {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
-        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
         .build();
     // TODO: to incorporate multiple different concrete `NestingExecWorkflow` 
sub-workflows in the same super-workflow... shall we use queues?!?!?
@@ -131,34 +106,4 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
 
     return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
   }
-
-  private List<Tag<?>> getTags(List<Tag<?>> tagsFromCurJob, String 
metricsSuffix, JobState jobStateFromHdfs) {
-    // Construct new tags list by combining subset of tags on HDFS job state 
and the rest of the fields from the current job
-    Map<String, Tag<?>> tagsMap = new HashMap<>();
-    Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
-        TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
-        TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
-        TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-        TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
-        TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
-
-    // Step 1, Add tags from the AZ props using the original job (the one that 
launched this yarn app)
-    tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
-
-    // Step 2. Add tags from the jobState (the original MR job on HDFS)
-    List<String> targetKeysToAddSuffix = 
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-    GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
-        .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
-        .forEach(tag -> {
-          // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND 
FLOW_GROUP_FIELDS to prevent collisions when testing
-          String value = targetKeysToAddSuffix.contains(tag.getKey())
-              ? tag.getValue() + metricsSuffix
-              : String.valueOf(tag.getValue());
-          tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
-        });
-
-    // Step 3: Overwrite any pre-existing metadata with name of the current 
caller
-    tagsMap.put(GobblinMetricsKeys.CLASS_META, new 
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
-    return new ArrayList<>(tagsMap.values());
-  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
index 63bce421a..097e279c5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -17,11 +17,15 @@
 
 package org.apache.gobblin.temporal.workflows.metrics;
 
-import org.slf4j.Logger;
+import java.util.Map;
 
 import io.temporal.workflow.Workflow;
 
+import org.slf4j.Logger;
+
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
 
 
 public class SubmitGTEActivityImpl implements SubmitGTEActivity {
@@ -29,6 +33,17 @@ public class SubmitGTEActivityImpl implements 
SubmitGTEActivity {
 
     @Override
     public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
+        log.info("submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
         eventSubmitterContext.create().submit(eventBuilder);
     }
+
+    private static String summarizeEventMetadataForLogging(GobblinEventBuilder 
eventBuilder) {
+        Map<String, String> metadata = eventBuilder.getMetadata();
+        return String.format("name: '%s'; namespace: '%s'; type: %s; start: 
%s; end: %s",
+            eventBuilder.getName(),
+            eventBuilder.getNamespace(),
+            metadata.getOrDefault(EventSubmitter.EVENT_TYPE, "<<event type not 
indicated>>"),
+            metadata.getOrDefault(TimingEvent.METADATA_START_TIME, "<<no start 
time>>"),
+            metadata.getOrDefault(TimingEvent.METADATA_END_TIME, "<<no end 
time>>"));
+    }
 }

Reply via email to