This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f5199d84 [GOBBLIN-1945] Implement Distributed Data Movement (DDM) 
Gobblin-on-Temporal `WorkUnit` evaluation (#3816)
6f5199d84 is described below

commit 6f5199d8476fb74879725d8a93e8dc6d080cb0a1
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Oct 31 23:41:45 2023 -0700

    [GOBBLIN-1945] Implement Distributed Data Movement (DDM) 
Gobblin-on-Temporal `WorkUnit` evaluation (#3816)
    
    * Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` 
evaluation
    
    * Adjust work unit processing tuning for start-to-close timeout and nested 
execution branching
    
    * Rework `ProcessWorkUnitImpl` and fix `FileSystem` misuse; plus 
convenience abstractions to load `FileSystem`, `JobState`, and 
`StateStore<TaskState>`
    
    * Fix `FileSystem` resource lifecycle, uniquely name each workflow, and 
drastically reduce worker concurrent task execution
    
    * Heed findbugs advice
    
    * prep before commit
    
    * Improve processing of required props
    
    * Update comment in response to PR feedback
---
 .../org/apache/gobblin/configuration/State.java    |   5 +-
 gobblin-temporal/build.gradle                      |   1 +
 .../temporal/ddm/activity/ProcessWorkUnit.java     |  31 +++
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     | 228 +++++++++++++++++++++
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |  96 +++++++++
 .../gobblin/temporal/ddm/util/JobStateUtils.java   |  98 +++++++++
 .../ddm/work/AbstractEagerFsDirBackedWorkload.java | 146 +++++++++++++
 ...EagerFsDirBackedWorkUnitClaimCheckWorkload.java |  55 +++++
 .../temporal/ddm/work/WUProcessingSpec.java        |  74 +++++++
 .../temporal/ddm/work/WorkUnitClaimCheck.java      |  57 ++++++
 .../gobblin/temporal/ddm/work/assistance/Help.java | 210 +++++++++++++++++++
 .../temporal/ddm/work/styles/FileSystemApt.java    |  32 +++
 .../ddm/work/styles/FileSystemJobStateful.java     |  23 +++
 .../temporal/ddm/work/styles/JobStateful.java      |  26 +++
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |  61 ++++++
 .../ddm/workflow/ProcessWorkUnitsWorkflow.java     |  32 +++
 .../NestingExecOfProcessWorkUnitWorkflowImpl.java  |  54 +++++
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  64 ++++++
 .../java/org/apache/gobblin/util/HadoopUtils.java  |   9 +
 .../org/apache/gobblin/util/JobLauncherUtils.java  |   7 +-
 .../org/apache/gobblin/util/PropertiesUtils.java   |  14 +-
 21 files changed, 1312 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
index eab5665c9..b3d225710 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
@@ -53,6 +53,7 @@ public class State implements WritableShim {
 
   private static final Joiner LIST_JOINER = Joiner.on(",");
   private static final Splitter LIST_SPLITTER = 
Splitter.on(",").trimResults().omitEmptyStrings();
+  private static final JsonParser JSON_PARSER = new JsonParser();
 
   private String id;
 
@@ -62,8 +63,6 @@ public class State implements WritableShim {
   @Getter
   private Properties specProperties;
 
-  private final JsonParser jsonParser = new JsonParser();
-
   public State() {
     this.specProperties = new Properties();
     this.commonProperties = new Properties();
@@ -476,7 +475,7 @@ public class State implements WritableShim {
    * @return {@link JsonArray} value associated with the key
    */
   public JsonArray getPropAsJsonArray(String key) {
-    JsonElement jsonElement = this.jsonParser.parse(getProp(key));
+    JsonElement jsonElement = this.JSON_PARSER.parse(getProp(key));
     Preconditions.checkArgument(jsonElement.isJsonArray(),
         "Value for key " + key + " is malformed, it must be a JsonArray: " + 
jsonElement);
     return jsonElement.getAsJsonArray();
diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle
index 25f9bf835..1832ac909 100644
--- a/gobblin-temporal/build.gradle
+++ b/gobblin-temporal/build.gradle
@@ -27,6 +27,7 @@ dependencies {
     compile project(":gobblin-api")
     compile project(":gobblin-cluster")
     compile project(":gobblin-core")
+    compile project(":gobblin-data-management")
     compile project(":gobblin-metrics-libs:gobblin-metrics")
     compile project(":gobblin-metastore")
     compile project(":gobblin-runtime")
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java
new file mode 100644
index 000000000..bdd9772da
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+
+
+/** Activity for processing/executing a {@link 
org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */
+@ActivityInterface
+public interface ProcessWorkUnit {
+  @ActivityMethod
+  // CAUTION: void return type won't work, as apparently it mayn't be the 
return type for `io.temporal.workflow.Functions.Func1`!
+  int processWorkUnit(WorkUnitClaimCheck wu);
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
new file mode 100644
index 000000000..640d78da1
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.AbstractTaskStateTracker;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.Task;
+import org.apache.gobblin.runtime.TaskCreationException;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
+import org.apache.gobblin.runtime.troubleshooter.NoopAutomaticTroubleshooter;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+
+@Slf4j
+public class ProcessWorkUnitImpl implements ProcessWorkUnit {
+  private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100;
+
+  @Override
+  public int processWorkUnit(WorkUnitClaimCheck wu) {
+    try (FileSystem fs = Help.loadFileSystemForce(wu)) {
+      List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
+      log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(), 
workUnits.size());
+      JobState jobState = Help.loadJobState(wu, fs);
+      return execute(workUnits, wu, jobState, fs);
+    } catch (IOException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected List<WorkUnit> loadFlattenedWorkUnits(WorkUnitClaimCheck wu, 
FileSystem fs) throws IOException {
+    Path wuPath = new Path(wu.getWorkUnitPath());
+    WorkUnit workUnit = 
JobLauncherUtils.createEmptyWorkUnitPerExtension(wuPath);
+    Help.deserializeStateWithRetries(fs, wuPath, workUnit, wu);
+    return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit));
+  }
+
+  /**
+   * NOTE: adapted from {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
+   * @return count of how many tasks executed (0 if execution ultimately 
failed, but we *believe* TaskState should already have been recorded beforehand)
+   */
+  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs) throws IOException, InterruptedException {
+    String containerId = "container-id-for-wu-" + wu.getCorrelator();
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
+
+    TaskStateTracker taskStateTracker = 
createEssentializedTaskStateTracker(wu);
+    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+    GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = 
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
+
+    SharedResourcesBroker<GobblinScopeTypes> resourcesBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+    AutomaticTroubleshooter troubleshooter = new NoopAutomaticTroubleshooter();
+    // 
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(wu.getStateConfig().getProperties()));
+    troubleshooter.start();
+
+    List<String> fileSourcePaths = workUnits.stream()
+        .map(workUnit -> describeAsCopyableFile(workUnit, 
wu.getWorkUnitPath()))
+        .collect(Collectors.toList());
+    log.info("WU [{}] - submitting {} workUnits for copying files: {}", 
wu.getCorrelator(),
+        workUnits.size(), fileSourcePaths);
+    log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(), 
workUnits.get(0).toJsonString());
+
+    try {
+      GobblinMultiTaskAttempt taskAttempt = 
GobblinMultiTaskAttempt.runWorkUnits(
+          jobState.getJobId(), containerId, jobState, workUnits,
+          taskStateTracker, taskExecutor, taskStateStore, 
multiTaskAttemptCommitPolicy,
+          resourcesBroker, troubleshooter.getIssueRepository(), 
createInterruptionPredicate(fs, jobState));
+      return taskAttempt.getNumTasksCreated();
+    } catch (TaskCreationException tce) { // derived type of `IOException` 
that ought not be caught!
+      throw tce;
+    } catch (IOException ioe) {
+      // presume execution already occurred, with `TaskState` written to 
reflect outcome
+      log.warn("WU [" + wu.getCorrelator() + "] - continuing on despite 
IOException:", ioe);
+      return 0;
+    }
+  }
+
+  /** Demonstration processing, to isolate debugging of WU loading and 
deserialization */
+  protected int countSumProperties(List<WorkUnit> workUnits, 
WorkUnitClaimCheck wu) {
+    int totalNumProps = workUnits.stream().mapToInt(workUnit -> 
workUnit.getPropertyNames().size()).sum();
+    log.info("opened WU [{}] to find {} properties total at '{}'", 
wu.getCorrelator(), totalNumProps, wu.getWorkUnitPath());
+    return totalNumProps;
+  }
+
+  protected TaskStateTracker 
createEssentializedTaskStateTracker(WorkUnitClaimCheck wu) {
+    return new AbstractTaskStateTracker(new Properties(), log) {
+      @Override
+      public void registerNewTask(Task task) {
+        // TODO: shall we schedule metrics update based on config?
+      }
+
+      @Override
+      public void onTaskRunCompletion(Task task) {
+        task.markTaskCompletion();
+      }
+
+      @Override
+      public void onTaskCommitCompletion(Task task) {
+        TaskState taskState = task.getTaskState();
+        // TODO: if metrics configured, report them now
+        log.info("WU [{} = {}] - finished commit after {}ms with state {}{}", 
wu.getCorrelator(), task.getTaskId(),
+            taskState.getTaskDuration(), taskState.getWorkingState(),
+            
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
+                ? (" to: " + 
taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
+        log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(), 
task.getTaskId(),
+            taskState.toJsonString(shouldUseExtendedLogging(wu)));
+        getOptCopyableFile(taskState).ifPresent(copyableFile -> {
+          log.info("WU [{} = {}] - completed copyableFile: {}", 
wu.getCorrelator(), task.getTaskId(),
+              copyableFile.toJsonString(shouldUseExtendedLogging(wu)));
+        });
+      }
+    };
+  }
+
+  protected String describeAsCopyableFile(WorkUnit workUnit, String 
workUnitPath) {
+    return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
+        .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
+        .orElse(
+            "<<not a CopyableFile("
+                + getOptCopyEntityClass(workUnit, workUnitPath)
+                .map(Class::getSimpleName)
+                .orElse("<<not a CopyEntity!>>")
+                + "): '" + workUnitPath + "'"
+        );
+  }
+
+  protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
+    return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
+  }
+
+  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
+    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
+      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
+    );
+  }
+
+  protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
+    return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
+      log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
+      if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+        String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+        if (serialization != null) {
+          return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
+        }
+      }
+      return Optional.empty();
+    });
+  }
+
+  protected Optional<Class<?>> getOptCopyEntityClass(State state, String 
logDesc) {
+    try {
+      return Optional.of(CopySource.getCopyEntityClass(state));
+    } catch (IOException ioe) {
+      log.warn(logDesc + " - failed getting copy entity class:", ioe);
+      return Optional.empty();
+    }
+  }
+
+  protected Predicate<GobblinMultiTaskAttempt> 
createInterruptionPredicate(FileSystem fs, JobState jobState) {
+    // TODO - decide whether to support... and if so, employ a useful path; 
otherwise, just evaluate predicate to always false
+    Path interruptionPath = new 
Path("/not/a/real/path/that/should/ever/exist!");
+    return createInterruptionPredicate(fs, interruptionPath);
+  }
+
+  protected Predicate<GobblinMultiTaskAttempt> 
createInterruptionPredicate(FileSystem fs, Path interruptionPath) {
+    return  (gmta) -> {
+      try {
+        return fs.exists(interruptionPath);
+      } catch (IOException ioe) {
+        return false;
+      }
+    };
+  }
+
+  protected boolean shouldUseExtendedLogging(WorkUnitClaimCheck wu) {
+    try {
+      return Long.parseLong(wu.getCorrelator()) % 
LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE == 0;
+    } catch (NumberFormatException nfe) {
+      log.warn("unexpected, non-numeric correlator: '{}'", wu.getCorrelator());
+      return false;
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
new file mode 100644
index 000000000..95425a643
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.launcher;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.typesafe.config.ConfigFactory;
+import io.temporal.client.WorkflowOptions;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.util.PropertiesUtils;
+
+import static 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow 
that executes {@link WorkUnit}s to fulfill
+ * the work they specify.  see: {@link ProcessWorkUnitsWorkflow}
+ *
+ * <p>
+ *   This class is instantiated by the {@link 
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job 
submission to launch the Gobblin job.
+ *   The actual task execution happens in the {@link 
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Slf4j
+public class ProcessWorkUnitsJobLauncher extends GobblinTemporalJobLauncher {
+  public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "name.node.uri";
+  public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR 
= GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.units.dir";
+
+  public static final String 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.branches.per.tree";
+  public static final String 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.sub.trees.per.tree";
+
+  public static final String WORKFLOW_ID_BASE = "ProcessWorkUnits";
+
+  public ProcessWorkUnitsJobLauncher(
+      Properties jobProps,
+      Path appWorkDir,
+      List<? extends Tag<?>> metadataTags,
+      ConcurrentHashMap<String, Boolean> runningMap
+  ) throws Exception {
+    super(jobProps, appWorkDir, metadataTags, runningMap);
+  }
+
+  @Override
+  public void submitJob(List<WorkUnit> workunits) {
+    try {
+      URI nameNodeUri = new URI(PropertiesUtils.getRequiredProp(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI));
+      // NOTE: `Path` is challenging for temporal to ser/de, but nonetheless 
do pre-construct as `Path`, to pre-validate this prop string's contents
+      Path workUnitsDir = new 
Path(PropertiesUtils.getRequiredProp(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR));
+      WUProcessingSpec wuSpec = new WUProcessingSpec(nameNodeUri, 
workUnitsDir.toString());
+      if 
(this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
+          
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+      }
+      WorkflowOptions options = WorkflowOptions.newBuilder()
+          .setTaskQueue(this.queueName)
+          .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
+          .build();
+      ProcessWorkUnitsWorkflow workflow = 
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
+      workflow.process(wuSpec);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
new file mode 100644
index 000000000..87d91e0c4
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+
+
+/**
+ * Utilities for applying {@link JobState} info to various ends:
+ * - creating a {@link SharedResourcesBroker}
+ * - obtaining a {@link StateStore<TaskState>}
+ */
+@Slf4j
+public class JobStateUtils {
+  private static final String OUTPUT_DIR_NAME = "output"; // following 
MRJobLauncher.OUTPUT_DIR_NAME
+
+  // reuse same handle among activities executed by the same worker
+  private static final transient Cache<Path, StateStore<TaskState>> 
taskStateStoreByPath = CacheBuilder.newBuilder().build();
+
+  private JobStateUtils() {}
+
+  public static StateStore<TaskState> openTaskStateStore(JobState jobState, 
FileSystem fs) {
+    try {
+      Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, 
fs);
+      return taskStateStoreByPath.get(taskStateStorePath, () ->
+          openTaskStateStoreUncached(jobState, fs)
+      );
+    } catch (ExecutionException ee) {
+      throw new RuntimeException(ee);
+    }
+  }
+
+  public static StateStore<TaskState> openTaskStateStoreUncached(JobState 
jobState, FileSystem fs) {
+    Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, 
fs);
+    log.info("opening FS task state store at path '{}'", taskStateStorePath);
+    return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(), 
TaskState.class);
+  }
+
+  /**
+   * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+   * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+   * @return path to {@link FsStateStore<TaskState>} backing dir
+   */
+  public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
+    Properties jobProps = jobState.getProperties();
+    Path jobOutputPath = new Path(
+        new Path(
+            new Path(
+                jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
+                JobState.getJobNameFromProps(jobProps)),
+            JobState.getJobIdFromProps(jobProps)),
+        OUTPUT_DIR_NAME);
+    return fs.makeQualified(jobOutputPath);
+  }
+
+  public static SharedResourcesBroker<GobblinScopeTypes> 
getSharedResourcesBroker(JobState jobState) {
+    SharedResourcesBroker<GobblinScopeTypes> globalBroker =
+        SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+            ConfigFactory.parseProperties(jobState.getProperties()),
+            GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    return globalBroker.newSubscopedBuilder(new 
JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
new file mode 100644
index 000000000..f6b6e05f1
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.util.nesting.work.SeqSliceBackedWorkSpan;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.util.HadoopUtils;
+
+
+/**
+ * {@link Workload} of `WORK_ITEM`s (as defined by derived class) that 
originates from the eagerly loaded contents of
+ * the directory `fsDir` within the {@link FileSystem} at `nameNodeUri`.
+ *
+ * IMPORTANT: to abide by Temporal's required determinism, a derived class 
must provide a {@link Comparator} for the
+ * *total ordering* of `WORK_ITEM`s.
+ */
[email protected] // IMPORTANT: for jackson (de)serialization
[email protected]
[email protected](exclude = { "stateConfig", "cachedWorkItems" })
+@Slf4j
+public abstract class AbstractEagerFsDirBackedWorkload<WORK_ITEM> implements 
Workload<WORK_ITEM>, FileSystemApt {
+
+  @Getter
+  @NonNull private URI fileSystemUri;
+  // NOTE: use `String` rather than `Path` to avoid: 
com.fasterxml.jackson.databind.exc.MismatchedInputException:
+  //   Cannot construct instance of `org.apache.hadoop.fs.Path` (although at 
least one Creator exists):
+  //     cannot deserialize from Object value (no delegate- or property-based 
Creator)
+  @NonNull private String fsDir;
+  @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED)
+  private transient volatile WORK_ITEM[] cachedWorkItems = null;
+
+  @Override
+  public Optional<Workload.WorkSpan<WORK_ITEM>> getSpan(final int startIndex, 
final int numElements) {
+    WORK_ITEM[] workItems = getCachedWorkItems();
+    if (startIndex >= workItems.length || startIndex < 0) {
+      return Optional.empty();
+    } else {
+      return Optional.of(new SeqSliceBackedWorkSpan<>(workItems, startIndex, 
numElements));
+    }
+  }
+
+  @Override
+  public boolean isIndexKnownToExceed(final int index) {
+    return isDefiniteSize() && cachedWorkItems != null && index >= 
cachedWorkItems.length;
+  }
+
+  @Override
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public boolean isDefiniteSize() {
+    return true;
+  }
+
+  protected abstract WORK_ITEM fromFileStatus(FileStatus fileStatus);
+
+  /**
+   *  IMPORTANT: to satisfy Temporal's required determinism, the `WORK_ITEM`s 
need a consistent total ordering
+   *  WARNING: this works so long as dir contents are unchanged in iterim
+   *  TODO: handle case of dir contents growing (e.g. use timestamp to filter 
out newer paths)... how could we handle the case of shrinking/deletion?
+   */
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  protected abstract Comparator<WORK_ITEM> getWorkItemComparator();
+
+  /** Hook for each `WORK_ITEM` to be associated with its final, post-sorting 
ordinal index */
+  protected void acknowledgeOrdering(int index, WORK_ITEM workItem) {
+    // no-op
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  protected PathFilter getPathFilter() {
+    return f -> true;
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  protected final synchronized WORK_ITEM[] getCachedWorkItems() {
+    if (cachedWorkItems != null) {
+      return cachedWorkItems;
+    }
+    try (FileSystem fs = loadFileSystem()) {
+      FileStatus[] fileStatuses = fs.listStatus(new Path(fsDir), 
this.getPathFilter());
+      log.info("loaded {} paths from '{}'", fileStatuses.length, fsDir);
+      WORK_ITEM[] workItems = 
(WORK_ITEM[])Stream.of(fileStatuses).map(this::fromFileStatus).toArray(Object[]::new);
+      sortWorkItems(workItems);
+      IntStream.range(0, workItems.length)
+          .forEach(i -> this.acknowledgeOrdering(i, workItems[i]));
+      cachedWorkItems = workItems;
+      return cachedWorkItems;
+    } catch (FileNotFoundException fnfe) {
+      throw new RuntimeException("directory not found: '" + fsDir + "'");
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  @Override
+  public State getFileSystemConfig() {
+    return new State(); // TODO - figure out how to truly set!
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  protected FileSystem loadFileSystem() throws IOException {
+    return HadoopUtils.getFileSystem(this.fileSystemUri, 
this.getFileSystemConfig());
+  }
+
+  private void sortWorkItems(WORK_ITEM[] workItems) {
+    Arrays.sort(workItems, getWorkItemComparator());
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
new file mode 100644
index 000000000..d2c193cb1
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import java.util.Comparator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.hadoop.fs.FileStatus;
+
+
+/**
+ * {@link AbstractEagerFsDirBackedWorkload} for {@link WorkUnitClaimCheck} 
`WORK_ITEM`s, which uses {@link WorkUnitClaimCheck#getWorkUnitPath()}
+ * for their total-ordering.
+ */
[email protected] // IMPORTANT: for jackson (de)serialization
[email protected](callSuper = true)
+public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends 
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
+
+  public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String 
hdfsDir) {
+    super(fileSystemUri, hdfsDir);
+  }
+
+  @Override
+  protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
+    // begin by setting all correlators to empty
+    return new WorkUnitClaimCheck("", this.getFileSystemUri(), 
fileStatus.getPath().toString());
+  }
+
+  @Override
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  protected Comparator<WorkUnitClaimCheck> getWorkItemComparator() {
+    return Comparator.comparing(WorkUnitClaimCheck::getWorkUnitPath);
+  }
+
+  @Override
+  protected void acknowledgeOrdering(int index, WorkUnitClaimCheck item) {
+    // later, after the post-total-ordering indices are know, use each item's 
index as its correlator
+    item.setCorrelator(Integer.toString(index));
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
new file mode 100644
index 000000000..3b2597194
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import java.util.Optional;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/**
+ * Intended to reference multiple {@link 
org.apache.gobblin.source.workunit.WorkUnit}s to process, where `workUnitsDir`
+ * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by 
`nameNodeUri`.  see:
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WUProcessingSpec implements FileSystemApt, FileSystemJobStateful {
+  @NonNull private URI fileSystemUri;
+  @NonNull private String workUnitsDir;
+  @NonNull private Tuning tuning = Tuning.DEFAULT;
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  @Override
+  public State getFileSystemConfig() {
+    return new State(); // TODO - figure out how to truly set!
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  @Override
+  public Path getJobStatePath() {
+    // TODO: decide whether wise to hard-code... (per `MRJobLauncher` 
conventions, we expect job state file to be sibling of WU dir)
+    return new Path(new Path(workUnitsDir).getParent(), 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+  }
+
+  /** Configuration for {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr,
 Workload, int, int, int, Optional)}*/
+  @Data
+  @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+  @RequiredArgsConstructor
+  public static class Tuning {
+    public static final int DEFAULT_MAX_BRANCHES_PER_TREE = 900;
+    public static final int DEFAULT_SUB_TREES_PER_TREE = 30;
+
+    public static final Tuning DEFAULT = new 
Tuning(DEFAULT_MAX_BRANCHES_PER_TREE, DEFAULT_SUB_TREES_PER_TREE);
+
+    @NonNull private int maxBranchesPerTree;
+    @NonNull private int maxSubTreesPerTree;
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
new file mode 100644
index 000000000..f03215664
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+
+/**
+ * Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by 
claim-check, where the `workUnitPath` is resolved
+ * against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`. 
 see:
+ * @see <a 
href="https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check";>Claim-Check
 Pattern</a>
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkUnitClaimCheck implements FileSystemApt, 
FileSystemJobStateful {
+  @NonNull private String correlator;
+  @NonNull private URI fileSystemUri;
+  @NonNull private String workUnitPath;
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  @Override
+  public State getFileSystemConfig() {
+    return new State(); // TODO - figure out how to truly set!
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  @Override
+  public Path getJobStatePath() {
+    // TODO: decide whether wise to hard-code... (per `MRJobLauncher` 
conventions, we expect job state file to be sibling of WU dir)
+    return new Path(new Path(workUnitPath).getParent().getParent(), 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
new file mode 100644
index 000000000..f382dcec1
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.assistance;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import com.typesafe.config.Config;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+
+
+/** Various capabilities useful in implementing Distributed Data Movement 
(DDM) */
+@Slf4j
+public class Help {
+  public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5;
+  public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
+  public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
+  public static final String USER_TO_PROXY_KEY = "user.to.proxy";
+
+  // treat `JobState` as immutable and cache, for reuse among activities 
executed by the same worker
+  private static final transient Cache<Path, JobState> jobStateByPath = 
CacheBuilder.newBuilder().recordStats().build();
+  private static final transient AtomicInteger jobStateAccessCount = new 
AtomicInteger(0);
+
+  private Help() {}
+
+  public static String qualifyNamePerExec(String name, FileSystemJobStateful 
f, Config workerConfig) {
+    return name + "_" + calcPerExecQualifier(f, workerConfig);
+  }
+
+  public static String qualifyNamePerExec(String name, Config workerConfig) {
+    return name + "_" + calcPerExecQualifier(workerConfig);
+  }
+
+  public static String calcPerExecQualifier(FileSystemJobStateful f, Config 
workerConfig) {
+    Optional<String> optFlowExecId = Optional.empty();
+    try {
+      optFlowExecId = 
Optional.of(loadJobState(f).getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
null));
+    } catch (IOException e) {
+      log.warn("unable to loadJobState", e);
+    }
+    return optFlowExecId.map(x -> x + "_").orElse("") + 
calcPerExecQualifier(workerConfig);
+  }
+
+  public static String calcPerExecQualifier(Config workerConfig) {
+    String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY)
+        ? workerConfig.getString(USER_TO_PROXY_KEY) : "";
+    String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY)
+        ? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) : 
UUID.randomUUID().toString();
+    return userToProxy + "_" + azFlowExecId;
+  }
+
+  public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
+    // NOTE: `FileSystem.get` appears to implement caching, which should 
facilitate sharing among activities executing on the same worker
+    return loadFileSystemForUri(a.getFileSystemUri(), a.getFileSystemConfig());
+  }
+
+  public static FileSystem loadFileSystemForUri(URI fsUri, State fsConfig) 
throws IOException {
+    // TODO - determine whether this works... unclear whether it led to "FS 
closed", or that had another cause...
+    // return HadoopUtils.getFileSystem(fsUri, fsConfig);
+    Configuration conf = HadoopUtils.getConfFromState(fsConfig);
+    return FileSystem.get(fsUri, conf);
+  }
+
+  public static FileSystem loadFileSystemForce(FileSystemApt a) throws 
IOException {
+    return loadFileSystemForUriForce(a.getFileSystemUri(), 
a.getFileSystemConfig());
+  }
+
+  public static FileSystem loadFileSystemForUriForce(URI fsUri, State 
fsConfig) throws IOException {
+    // for reasons still not fully understood, we encountered many "FS closed" 
failures before disabling HDFS caching--especially as num WUs increased.
+    // perhaps caching-facilitated reuse of the same FS across multiple WUs 
caused prior WU execs to leave the FS in a problematic state for subsequent 
execs
+    // TODO - more investigation to sort out the true RC... and whether 
caching definitively is or is not possible for use here!
+    // return HadoopUtils.getFileSystem(fsUri, fsConfig);
+    Configuration conf = HadoopUtils.getConfFromState(fsConfig);
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+    return FileSystem.get(fsUri, conf);
+  }
+
+  public static JobState loadJobState(FileSystemJobStateful f) throws 
IOException {
+    try (FileSystem fs = loadFileSystemForce(f)) {
+      return loadJobState(f, fs);
+    }
+  }
+
+  public static JobState loadJobState(JobStateful js, FileSystem fs) throws 
IOException {
+    try {
+      incrementJobStateAccess();
+      return jobStateByPath.get(js.getJobStatePath(), () ->
+          loadJobStateUncached(js, fs)
+      );
+    } catch (ExecutionException ee) {
+      throw new IOException(ee);
+    }
+  }
+
+  public static JobState loadJobStateUncached(JobStateful js, FileSystem fs) 
throws IOException {
+    JobState jobState = new JobState();
+    SerializationUtils.deserializeState(fs, js.getJobStatePath(), jobState);
+    log.info("loaded jobState from '{}': {}", js.getJobStatePath(), 
jobState.toJsonString(true));
+    return jobState;
+  }
+
+  public static JobState loadJobStateWithRetries(FileSystemJobStateful f) 
throws IOException {
+    try (FileSystem fs = loadFileSystemForce(f)) {
+      return loadJobStateWithRetries(f, fs);
+    }
+  }
+
+  public static JobState loadJobStateWithRetries(FileSystemJobStateful f, 
FileSystem fs) throws IOException {
+    try {
+      incrementJobStateAccess();
+      return jobStateByPath.get(f.getJobStatePath(), () ->
+          loadJobStateUncachedWithRetries(f, fs, f)
+      );
+    } catch (ExecutionException ee) {
+      throw new IOException(ee);
+    }
+  }
+
+  public static JobState loadJobStateUncachedWithRetries(JobStateful js, 
FileSystem fs, FileSystemApt fsApt) throws IOException {
+    JobState jobState = new JobState();
+    deserializeStateWithRetries(fs, js.getJobStatePath(), jobState, fsApt, 
MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS);
+    log.info("loaded jobState from '{}': {}", js.getJobStatePath(), 
jobState.toJsonString(true));
+    return jobState;
+  }
+
+  public static <T extends State> void deserializeStateWithRetries(FileSystem 
fs, Path path, T state, FileSystemApt fsApt)
+      throws IOException {
+    deserializeStateWithRetries(fs, path, state, fsApt, 
MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS);
+  }
+
+  // TODO: decide whether actually necessary...  it was added in a fit of 
debugging "FS closed" errors
+  public static <T extends State> void deserializeStateWithRetries(FileSystem 
fs, Path path, T state, FileSystemApt fsApt, int maxAttempts)
+      throws IOException {
+    for (int i = 0; i < maxAttempts; ++i) {
+      if (i > 0) {
+        log.info("reopening FS '{}' to retry ({}) deserialization (attempt 
{})", fsApt.getFileSystemUri(),
+            state.getClass().getSimpleName(), i);
+        fs = Help.loadFileSystem(fsApt);
+      }
+      try {
+        SerializationUtils.deserializeState(fs, path, state);
+        return;
+      } catch (IOException ioe) {
+        if (ioe.getMessage().equals("Filesystem closed") && i < maxAttempts - 
1) {
+          continue;
+        } else {
+          throw ioe;
+        }
+      }
+    }
+  }
+
+  public static StateStore<TaskState> openTaskStateStore(FileSystemJobStateful 
f) throws IOException {
+    try (FileSystem fs = Help.loadFileSystem(f)) {
+      return JobStateUtils.openTaskStateStore(Help.loadJobState(f, fs), fs);
+    }
+  }
+
+  public static StateStore<TaskState> openTaskStateStore(FileSystemJobStateful 
js, FileSystem fs) throws IOException {
+    return JobStateUtils.openTaskStateStoreUncached(loadJobState(js), fs);
+    // public static StateStore<TaskState> openTaskStateStore(JobStateful js, 
FileSystem fs) throws IOException {
+    //   return JobStateUtils.openTaskStateStore(loadJobState(js, fs), fs);
+  }
+
+  private static void incrementJobStateAccess() {
+    int numAccesses = jobStateAccessCount.getAndIncrement();
+    if (numAccesses % LOG_CACHE_STATS_EVERY_N_ACCESSES == 0) {
+      log.info("JobState(numAccesses: {}) - {}", numAccesses, 
jobStateByPath.stats());
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java
new file mode 100644
index 000000000..324aae04d
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.styles;
+
+import java.net.URI;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.gobblin.configuration.State;
+
+
+/** Marks a type that can indicate a {@link org.apache.hadoop.fs.FileSystem} 
via its {@link URI} and configuration */
+public interface FileSystemApt {
+
+  URI getFileSystemUri();
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  State getFileSystemConfig();
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java
new file mode 100644
index 000000000..d3b0accdb
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.styles;
+
+
+/** Marks a type that can indicate both a {@link 
org.apache.hadoop.fs.FileSystem} and a {@link 
org.apache.gobblin.runtime.JobState} */
+public interface FileSystemJobStateful extends JobStateful, FileSystemApt {
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java
new file mode 100644
index 000000000..3dc7aed32
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.styles;
+
+import org.apache.hadoop.fs.Path;
+
+
+/** Marks a type that can indicate a {@link 
org.apache.gobblin.runtime.JobState} via its {@link Path} */
+public interface JobStateful {
+  Path getJobStatePath();
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
new file mode 100644
index 000000000..9af6995b5
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.worker;
+
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.WorkerOptions;
+
+import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+
+
+/** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
+public class WorkFulfillmentWorker extends AbstractTemporalWorker {
+    public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120;
+    public static final int MAX_EXECUTION_CONCURRENCY = 3;
+
+    public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) 
{
+        super(config, workflowClient);
+    }
+
+    @Override
+    protected Class<?>[] getWorkflowImplClasses() {
+        return new Class[] { ProcessWorkUnitsWorkflowImpl.class, 
NestingExecOfProcessWorkUnitWorkflowImpl.class };
+    }
+
+    @Override
+    protected Object[] getActivityImplInstances() {
+        return new Object[] { new ProcessWorkUnitImpl() };
+    }
+
+    @Override
+    protected WorkerOptions createWorkerOptions() {
+        return WorkerOptions.newBuilder()
+            // default is only 1s - WAY TOO SHORT for 
`o.a.hadoop.fs.FileSystem#listStatus`!
+            
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
+            .setMaxConcurrentActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
+            
.setMaxConcurrentLocalActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
+            
.setMaxConcurrentWorkflowTaskExecutionSize(MAX_EXECUTION_CONCURRENCY)
+            .build();
+    }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
new file mode 100644
index 000000000..ba2ccf99a
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+
+
+/** Temporal workflow for executing {@link WorkUnit}s to fulfill the work they 
specify. */
+@WorkflowInterface
+public interface ProcessWorkUnitsWorkflow {
+  /** @return the number of {@link WorkUnit}s cumulatively processed 
successfully */
+  @WorkflowMethod
+  int process(WUProcessingSpec wuSpec);
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
new file mode 100644
index 000000000..074bb4609
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+import java.time.Duration;
+
+import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import 
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;
+
+
+/** {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for 
{@link ProcessWorkUnit} */
+public class NestingExecOfProcessWorkUnitWorkflowImpl extends 
AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> {
+
+  // RetryOptions specify how to automatically handle retries when Activities 
fail.
+  private static final RetryOptions ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final ActivityOptions ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
+      .setStartToCloseTimeout(Duration.ofSeconds(999))
+      .setRetryOptions(ACTIVITY_RETRY_OPTS)
+      .build();
+
+  private final ProcessWorkUnit activityStub = 
Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS);
+
+  @Override
+  protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) {
+    return Async.function(activityStub::processWorkUnit, wu);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
new file mode 100644
index 000000000..eafc62409
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.util.Optional;
+
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+
+
+public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
+  public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
+
+  @Override
+  public int process(WUProcessingSpec workSpec) {
+    Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+    NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
+    return processingWorkflow.performWorkload(
+        WorkflowAddr.ROOT, workload, 0,
+        workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
+    );
+  }
+
+  protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec 
workSpec) {
+    return new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(), 
workSpec.getWorkUnitsDir());
+  }
+
+  protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f) {
+    ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+        .setWorkflowId(Help.qualifyNamePerExec(CHILD_WORKFLOW_ID_BASE, f, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        .build();
+    // TODO: to incorporate multiple different concrete `NestingExecWorkflow` 
sub-workflows in the same super-workflow... shall we use queues?!?!?
+    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+  }
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index 3e020f6f8..012795dd1 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -1029,6 +1029,15 @@ public class HadoopUtils {
     return 
HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), 
conf), state);
   }
 
+  /**
+   * Get a {@link FileSystem} for `fsUri`
+   * @throws IOException
+   */
+  public static FileSystem getFileSystem(URI fsUri, State state) throws 
IOException {
+    Configuration conf = HadoopUtils.getConfFromState(state, 
Optional.absent());
+    return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(fsUri, 
conf), state);
+  }
+
   /**
    * Get a {@link FileSystem} object for the uri specified at {@link 
ConfigurationKeys#WRITER_FILE_SYSTEM_URI}.
    * @throws IOException
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 1f20b1b73..89bf4d4bd 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -130,12 +130,7 @@ public class JobLauncherUtils {
   public static List<WorkUnit> loadFlattenedWorkUnits(FileSystem fs, Path 
path) throws IOException {
     WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(path);
     SerializationUtils.deserializeState(fs, path, workUnit);
-
-    if (workUnit.isMultiWorkUnit()) {
-      return JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) 
workUnit).getWorkUnits());
-    } else {
-      return Lists.newArrayList(workUnit);
-    }
+    return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit));
   }
 
   /** @return an empty {@link WorkUnit}, potentially an empty {@link 
MultiWorkUnit}, based on the {@link Path} extension */
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index d6361b927..0aaed3729 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -72,9 +72,19 @@ public class PropertiesUtils {
 
   /** @throws {@link NullPointerException} when `key` not in `properties` */
   public static int getRequiredPropAsInt(Properties properties, String key) {
+    return Integer.parseInt(getRequiredPropRaw(properties, key, 
Optional.of("an integer")));
+  }
+
+  /** @throws {@link NullPointerException} when `key` not in `properties` */
+  public static String getRequiredProp(Properties properties, String key) {
+    return getRequiredPropRaw(properties, key, Optional.absent());
+  }
+
+  /** @throws {@link NullPointerException} when `key` not in `properties` */
+  public static String getRequiredPropRaw(Properties properties, String key, 
Optional<String> desc) {
     String value = properties.getProperty(key);
-    Preconditions.checkNotNull(value, "'" + key + "' must be set (to an 
integer)");
-    return Integer.parseInt(value);
+    Preconditions.checkNotNull(value, "'" + key + "' must be set" + 
desc.transform(s -> " (to " + desc + ")").or(""));
+    return value;
   }
 
   public static long getPropAsLong(Properties properties, String key, long 
defaultValue) {

Reply via email to