This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a78ee5473 [GOBBLIN-2007] Implement Distributed Data Movement (DDM) 
Gobblin-on-Temporal `WorkUnit` generation (for arbitrary `Source`) (#3880)
a78ee5473 is described below

commit a78ee5473dc9f24bf9acbf8432c9d838d25655c7
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Feb 26 12:31:36 2024 -0800

    [GOBBLIN-2007] Implement Distributed Data Movement (DDM) 
Gobblin-on-Temporal `WorkUnit` generation (for arbitrary `Source`) (#3880)
    
    * Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` 
generation (for arbitrary `Source`)
    
    * Update in response to review comments
---
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |   2 +-
 .../gobblin/runtime/AbstractJobLauncher.java       |   6 +-
 .../temporal/GobblinTemporalConfigurationKeys.java |   1 +
 .../temporal/cluster/AbstractTemporalWorker.java   |   2 +-
 .../cluster/GobblinTemporalTaskRunner.java         |   6 +-
 .../temporal/ddm/activity/GenerateWorkUnits.java   |  35 +++++
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   | 154 +++++++++++++++++++++
 .../ddm/launcher/GenerateWorkUnitsJobLauncher.java |  90 ++++++++++++
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |   2 +-
 .../gobblin/temporal/ddm/util/JobStateUtils.java   | 101 ++++++++++++--
 .../gobblin/temporal/ddm/work/assistance/Help.java |  50 +++++--
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |   6 +-
 .../ddm/workflow/GenerateWorkUnitsWorkflow.java    |  35 +++++
 .../impl/GenerateWorkUnitsWorkflowImpl.java        |  56 ++++++++
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |   4 +-
 .../temporal/joblauncher/GobblinJobLauncher.java   |   2 +-
 .../joblauncher/GobblinTemporalJobLauncher.java    |  29 +++-
 .../org/apache/gobblin/util/JobLauncherUtils.java  |  14 +-
 18 files changed, 542 insertions(+), 53 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 15fff870b..33b9e0aaf 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -301,7 +301,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
     // For streaming use case, this might be a necessary step to find dataset 
specific namespace so that each workUnit
     // can create staging and temp directories with the correct determination 
of shard-path
-    workUnitStream = this.executeHandlers(workUnitStream, 
this.destDatasetHandlerService);
+    workUnitStream = 
this.destDatasetHandlerService.executeHandlers(workUnitStream);
     workUnitStream = this.processWorkUnitStream(workUnitStream, jobState);
     workUnits = materializeWorkUnitList(workUnitStream);
     try {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 578696f62..e0078803c 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -516,7 +516,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         this.canCleanUpStagingData = 
this.canCleanStagingData(this.jobContext.getJobState());
         this.destDatasetHandlerService = new 
DestinationDatasetHandlerService(jobState, canCleanUpStagingData, 
this.eventSubmitter);
         closer.register(this.destDatasetHandlerService);
-        workUnitStream = this.executeHandlers(workUnitStream, 
this.destDatasetHandlerService);
+        workUnitStream = 
this.destDatasetHandlerService.executeHandlers(workUnitStream);
 
         //Initialize writer and converter(s)
         closer.register(WriterInitializerFactory.newInstace(jobState, 
workUnitStream)).initialize();
@@ -698,10 +698,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
     }
   }
 
-  protected WorkUnitStream executeHandlers(WorkUnitStream workUnitStream, 
DestinationDatasetHandlerService datasetHandlerService) {
-    return datasetHandlerService.executeHandlers(workUnitStream);
-  }
-
   protected WorkUnitStream processWorkUnitStream(WorkUnitStream 
workUnitStream, JobState jobState) {
     // Add task ids
     workUnitStream = prepareWorkUnits(workUnitStream, jobState);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 182ce7daa..40223e093 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -42,6 +42,7 @@ public interface GobblinTemporalConfigurationKeys {
   String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = 
HelloWorldJobLauncher.class.getName();
 
   String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
+  String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides";
 
   /**
    * Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing 
collisions with prod jobs
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
index 0ed6652eb..8ab428c41 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
@@ -78,7 +78,7 @@ public abstract class AbstractTemporalWorker implements 
TemporalWorker {
     protected abstract Object[] getActivityImplInstances();
 
     private final void stashWorkerConfig(Config cfg) {
-        // stash in association with...
+        // stash to associate with...
         WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
         Arrays.stream(getWorkflowImplClasses()).forEach(clazz -> 
WorkerConfig.withImpl(clazz, cfg)); // its workflow impls
         Arrays.stream(getActivityImplInstances()).forEach(obj -> 
WorkerConfig.withImpl(obj.getClass(), cfg)); // its activity impls
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index 61545f6ed..4e55c9777 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -246,10 +246,12 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
 
     String workerClassName = ConfigUtils.getString(clusterConfig,
         GobblinTemporalConfigurationKeys.WORKER_CLASS, 
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+    logger.info("Creating worker - class: '{}'", workerClassName);
+    Config workerConfig = clusterConfig;
     TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
-        (Class<TemporalWorker>)Class.forName(workerClassName), clusterConfig, 
client);
+        (Class<TemporalWorker>)Class.forName(workerClassName), workerConfig, 
client);
     worker.start();
-    logger.info("A new worker is started.");
+    logger.info("Finished starting worker - class: '{}'", workerClassName);
     return worker;
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
new file mode 100644
index 000000000..5f3f0c639
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.util.Properties;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/** Activity for generating {@link WorkUnit}s and persisting them to the 
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
+@ActivityInterface
+public interface GenerateWorkUnits {
+  /** @return the number of {@link WorkUnit}s generated and persisted */
+  @ActivityMethod
+  int generateWorkUnits(Properties jobProps, EventSubmitterContext 
eventSubmitterContext);
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
new file mode 100644
index 000000000..797ba7951
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import com.google.api.client.util.Lists;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import io.temporal.failure.ApplicationFailure;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.WorkUnitStreamSource;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
+
+
+@Slf4j
+public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
+
+  @Override
+  public int generateWorkUnits(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
+    // TODO: decide whether to acquire a job lock (as MR did)!
+    // TODO: provide for job cancellation (unless handling at the 
temporal-level of parent workflows)!
+    JobState jobState = new JobState(jobProps);
+    log.info("Created jobState: {}", jobState.toJsonString(true));
+    Optional<Config> thisClassConfig = WorkerConfig.of(this);
+    log.info("Obtained class config: {}", thisClassConfig.isPresent() ? 
thisClassConfig.get() : "NO WORKER CONFIG: ERROR!");
+
+    Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
+    log.info("Using work dir root path for job '{}' - '{}'", 
jobState.getJobId(), workDirRoot);
+
+    // TODO: determine whether these are actually necessary to do (as 
MR/AbstractJobLauncher did)!
+    // SharedResourcesBroker<GobblinScopeTypes> jobBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+    // jobState.setBroker(jobBroker);
+    // jobState.setWorkUnitAndDatasetStateFunctional(new 
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
+
+    try (Closer closer = Closer.create()) {
+      // before embarking on (potentially expensive) WU creation, first 
pre-check that the FS is available
+      FileSystem fs = JobStateUtils.openFileSystem(jobState);
+      fs.mkdirs(workDirRoot);
+
+      List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState, 
eventSubmitterContext, closer);
+
+      JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
+      JobStateUtils.writeJobState(jobState, workDirRoot, fs);
+
+      return workUnits.size();
+    } catch (ReflectiveOperationException roe) {
+      String errMsg = "Unable to construct a source for generating workunits 
for job " + jobState.getJobId();
+      log.error(errMsg, roe);
+      throw ApplicationFailure.newNonRetryableFailureWithCause(errMsg, 
"Failure: new Source()", roe);
+    } catch (IOException ioe) {
+      String errMsg = "Failed to generate workunits for job " + 
jobState.getJobId();
+      log.error(errMsg, ioe);
+      throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: 
generating/writing workunits", ioe);
+    } finally {
+      // TODO: implement Troubleshooter integration!
+    }
+  }
+
+  protected static List<WorkUnit> generateWorkUnitsForJobState(JobState 
jobState, EventSubmitterContext eventSubmitterContext, Closer closer)
+      throws ReflectiveOperationException {
+    Source<?, ?> source = JobStateUtils.createSource(jobState);
+    WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
+        ? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
+        : new 
BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
+
+    // TODO: report (timer) metrics for workunits creation
+    if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // 
indicates a problem getting the WUs
+      String errMsg = "Failure in getting work units for job " + 
jobState.getJobId();
+      log.error(errMsg);
+      // TODO: decide whether a non-retryable failure is too severe... (in 
most circumstances, it's likely what we want)
+      throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: 
Source.getWorkUnits()");
+    }
+
+    if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: 
entirely normal result (not a failure)
+      log.warn("No work units created for job " + jobState.getJobId());
+      return Lists.newArrayList();
+    }
+
+    // TODO: count total bytes for progress tracking!
+
+    boolean canCleanUp = canCleanStagingData(jobState);
+    DestinationDatasetHandlerService datasetHandlerService = closer.register(
+        new DestinationDatasetHandlerService(jobState, canCleanUp, 
eventSubmitterContext.create()));
+    WorkUnitStream handledWorkUnitStream = 
datasetHandlerService.executeHandlers(workUnitStream);
+
+    // initialize writer and converter(s)
+    // TODO: determine whether registration here is effective, or the 
lifecycle of this activity is too brief (as is likely!)
+    closer.register(WriterInitializerFactory.newInstace(jobState, 
handledWorkUnitStream)).initialize();
+    closer.register(ConverterInitializerFactory.newInstance(jobState, 
handledWorkUnitStream)).initialize();
+
+    // update jobState before it gets serialized
+    long startTime = System.currentTimeMillis();
+    jobState.setStartTime(startTime);
+    jobState.setState(JobState.RunningState.RUNNING);
+
+    log.info("Starting job " + jobState.getJobId());
+    // TODO: report (timer) metrics for workunits preparation
+    WorkUnitStream preparedWorkUnitStream = 
AbstractJobLauncher.prepareWorkUnits(handledWorkUnitStream, jobState);
+
+    // TODO: 
gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS),
 jobState);
+
+    // dump the work unit if tracking logs are enabled (post any 
materialization for counting)
+    WorkUnitStream trackedWorkUnitStream = 
AbstractJobLauncher.addWorkUnitTrackingPerConfig(preparedWorkUnitStream, 
jobState, log);
+
+    return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
+  }
+
+  protected static boolean canCleanStagingData(JobState jobState) {
+    if 
(DeliverySemantics.EXACTLY_ONCE.equals(DeliverySemantics.parse(jobState))) {
+      String errMsg = "DeliverySemantics.EXACTLY_ONCE NOT currently supported; 
job " + jobState.getJobId();
+      log.error(errMsg);
+      throw ApplicationFailure.newNonRetryableFailure(errMsg, "Unsupported: 
DeliverySemantics.EXACTLY_ONCE");
+    }
+    return true;
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
new file mode 100644
index 000000000..b08835aa6
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.launcher;
+
+import com.typesafe.config.Config;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.client.WorkflowOptions;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow 
that generates {@link WorkUnit}s per an arbitrary
+ * {@link org.apache.gobblin.source.Source}; see: {@link 
GenerateWorkUnitsWorkflow}
+ *
+ * <p>
+ *   This class is instantiated by the {@link 
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job 
submission to launch the Gobblin job.
+ *   The actual task execution happens in the {@link 
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Slf4j
+public class GenerateWorkUnitsJobLauncher extends GobblinTemporalJobLauncher {
+
+  public static final String WORKFLOW_ID_BASE = "GenerateWorkUnits";
+
+  public GenerateWorkUnitsJobLauncher(
+      Properties jobProps,
+      Path appWorkDir,
+      List<? extends Tag<?>> metadataTags,
+      ConcurrentHashMap<String, Boolean> runningMap,
+      EventBus eventBus
+  ) throws Exception {
+    super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
+  }
+
+  @Override
+  public void submitJob(List<WorkUnit> workunits) {
+    try {
+      WorkflowOptions options = WorkflowOptions.newBuilder()
+          .setTaskQueue(this.queueName)
+          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps)))
+          .build();
+      GenerateWorkUnitsWorkflow workflow = 
this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);
+
+      Config jobConfigWithOverrides = 
applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));
+
+      Help.propagateGaaSFlowExecutionContext(this.jobProps);
+      EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext(this.eventSubmitter);
+
+      int numWorkUnits = 
workflow.generate(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
+      log.info("FINISHED - GenerateWorkUnitsWorkflow.generate = {}", 
numWorkUnits);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index ee6fe7f80..006572dc0 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -98,7 +98,7 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
 
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
-          .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
+          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
           .build();
       ProcessWorkUnitsWorkflow workflow = 
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
       workflow.process(wuSpec);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index 87d91e0c4..ae6a8b711 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -17,13 +17,17 @@
 
 package org.apache.gobblin.temporal.ddm.util;
 
-import java.util.Properties;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 import lombok.extern.slf4j.Slf4j;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.io.Closer;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -36,24 +40,53 @@ import 
org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SourceDecorator;
 import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
 
 
 /**
  * Utilities for applying {@link JobState} info to various ends:
+ * - opening the {@link FileSystem}
+ * - creating the {@link Source}
  * - creating a {@link SharedResourcesBroker}
- * - obtaining a {@link StateStore<TaskState>}
+ * - opening the {@link StateStore<TaskState>}
+ * - writing serialized {@link WorkUnit}s to the {@link FileSystem}
+ * - writing serialized {@link JobState} to the {@link FileSystem}
  */
 @Slf4j
 public class JobStateUtils {
-  private static final String OUTPUT_DIR_NAME = "output"; // following 
MRJobLauncher.OUTPUT_DIR_NAME
+  public static final String INPUT_DIR_NAME = "input"; // following 
MRJobLauncher.INPUT_DIR_NAME
+  public static final String OUTPUT_DIR_NAME = "output"; // following 
MRJobLauncher.OUTPUT_DIR_NAME
+  public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true;
 
   // reuse same handle among activities executed by the same worker
   private static final transient Cache<Path, StateStore<TaskState>> 
taskStateStoreByPath = CacheBuilder.newBuilder().build();
 
   private JobStateUtils() {}
 
+  /** @return the {@link FileSystem} indicated by {@link 
ConfigurationKeys#FS_URI_KEY} */
+  public static FileSystem openFileSystem(JobState jobState) throws 
IOException {
+    URI fsUri = URI.create(jobState.getProp(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
+    return Help.loadFileSystemForUriForce(fsUri, jobState);
+  }
+
+  /** @return a new instance of {@link Source} identified by {@link 
ConfigurationKeys#SOURCE_CLASS_KEY} */
+  public static Source<?, ?> createSource(JobState jobState) throws 
ReflectiveOperationException {
+    Class<?> sourceClass = 
Class.forName(jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY));
+    log.info("Creating source: '{}'", sourceClass.getName());
+    Source<?, ?> source = new SourceDecorator<>(
+        Source.class.cast(sourceClass.newInstance()),
+        jobState.getJobId(), log);
+    return source;
+  }
+
   public static StateStore<TaskState> openTaskStateStore(JobState jobState, 
FileSystem fs) {
     try {
       Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState, 
fs);
@@ -71,23 +104,69 @@ public class JobStateUtils {
     return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(), 
TaskState.class);
   }
 
+  /**
+   * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+   * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+   * @return "base" dir root path for work dir (parent of inputs, output task 
states, etc.)
+   */
+  public static Path getWorkDirRoot(JobState jobState) {
+    return new Path(
+        new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), 
jobState.getJobName()),
+        jobState.getJobId());
+  }
+
   /**
    * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
    * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
    * @return path to {@link FsStateStore<TaskState>} backing dir
    */
   public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
-    Properties jobProps = jobState.getProperties();
-    Path jobOutputPath = new Path(
-        new Path(
-            new Path(
-                jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
-                JobState.getJobNameFromProps(jobProps)),
-            JobState.getJobIdFromProps(jobProps)),
-        OUTPUT_DIR_NAME);
+    Path jobOutputPath = new Path(getWorkDirRoot(jobState), OUTPUT_DIR_NAME);
     return fs.makeQualified(jobOutputPath);
   }
 
+  /** write serialized {@link WorkUnit}s in parallel into files named after 
the jobID and task IDs */
+  public static void writeWorkUnits(List<WorkUnit> workUnits, Path 
workDirRootPath, JobState jobState, FileSystem fs)
+      throws IOException {
+    String jobId = jobState.getJobId();
+    Path targetDirPath = new Path(workDirRootPath, INPUT_DIR_NAME);
+
+    int numThreads = 
ParallelRunner.getNumThreadsConfig(jobState.getProperties());
+    Closer closer = Closer.create(); // (NOTE: try-with-resources syntax 
wouldn't allow `catch { closer.rethrow(t) }`)
+    try {
+      ParallelRunner parallelRunner = closer.register(new 
ParallelRunner(numThreads, fs));
+
+      JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new 
JobLauncherUtils.WorkUnitPathCalculator();
+      int i = 0;
+      for (WorkUnit workUnit : workUnits) {
+        Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId, 
targetDirPath);
+        if (i++ == 0) {
+          log.info("Writing work unit file [first of {}]: '{}'", 
workUnits.size(), workUnitFile);
+        }
+        parallelRunner.serializeToFile(workUnit, workUnitFile);
+      }
+    } catch (Throwable t) {
+      throw closer.rethrow(t);
+    } finally {
+      closer.close();
+    }
+  }
+
+  /** write serialized `jobState` beneath `workDirRootPath` of `fs`, per 
{@link JobStateUtils#DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES} */
+  public static void writeJobState(JobState jobState, Path workDirRootPath, 
FileSystem fs) throws IOException {
+    writeJobState(jobState, workDirRootPath, fs, 
DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES);
+  }
+
+  /** write serialized `jobState` beneath `workDirRootPath` of `fs`, per 
whether to `writePreviousWorkUnitStates` */
+  public static void writeJobState(JobState jobState, Path workDirRootPath, 
FileSystem fs, boolean writePreviousWorkUnitStates) throws IOException {
+    Path targetPath = new Path(workDirRootPath, 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+    try (DataOutputStream dataOutputStream = new 
DataOutputStream(fs.create(targetPath))) {
+      log.info("Writing serialized jobState to '{}'", targetPath);
+      jobState.write(dataOutputStream, false, writePreviousWorkUnitStates);
+      log.info("Finished writing jobState to '{}'", targetPath);
+    }
+  }
+
   public static SharedResourcesBroker<GobblinScopeTypes> 
getSharedResourcesBroker(JobState jobState) {
     SharedResourcesBroker<GobblinScopeTypes> globalBroker =
         SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index dd47f590f..09da984a6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.temporal.ddm.work.assistance;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ExecutionException;
@@ -31,6 +32,8 @@ import com.google.common.cache.CacheBuilder;
 
 import com.typesafe.config.Config;
 
+import org.slf4j.MDC;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,9 +47,9 @@ import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.SerializationUtils;
-import org.slf4j.MDC;
 
 
 /** Various capabilities useful in implementing Distributed Data Movement 
(DDM) */
@@ -63,22 +66,35 @@ public class Help {
 
   private Help() {}
 
-  public static String qualifyNamePerExec(String name, FileSystemJobStateful 
f, Config workerConfig) {
-    return name + "_" + calcPerExecQualifier(f, workerConfig);
+  /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from {@link JobState} */
+  public static String qualifyNamePerExecWithFlowExecId(String name, 
FileSystemJobStateful f, Config workerConfig) {
+    return name + "_" + calcPerExecQualifierWithOptFlowExecId(f, workerConfig);
   }
 
-  public static String qualifyNamePerExec(String name, Config workerConfig) {
+  /** @return execution-specific name, NOT incorporating {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} */
+  public static String qualifyNamePerExecWithoutFlowExecId(String name, Config 
workerConfig) {
     return name + "_" + calcPerExecQualifier(workerConfig);
   }
 
-  public static String calcPerExecQualifier(FileSystemJobStateful f, Config 
workerConfig) {
+  /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
+  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
workerConfig) {
+    Optional<String> optFlowExecId = 
Optional.ofNullable(ConfigUtils.getString(workerConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, 
workerConfig);
+  }
+
+  public static String calcPerExecQualifierWithOptFlowExecId(Optional<String> 
optFlowExecId, Config workerConfig) {
+    return optFlowExecId.map(x -> x + "_").orElse("") + 
calcPerExecQualifier(workerConfig);
+  }
+
+  public static String 
calcPerExecQualifierWithOptFlowExecId(FileSystemJobStateful f, Config 
workerConfig) {
     Optional<String> optFlowExecId = Optional.empty();
     try {
+      // TODO: determine whether the same could be obtained from 
`workerConfig` (likely much more efficiently)
       optFlowExecId = 
Optional.of(loadJobState(f).getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
null));
     } catch (IOException e) {
       log.warn("unable to loadJobState", e);
     }
-    return optFlowExecId.map(x -> x + "_").orElse("") + 
calcPerExecQualifier(workerConfig);
+    return calcPerExecQualifierWithOptFlowExecId(optFlowExecId, workerConfig);
   }
 
   public static String calcPerExecQualifier(Config workerConfig) {
@@ -210,9 +226,25 @@ public class Help {
   }
 
   public static void propagateGaaSFlowExecutionContext(JobState jobState) {
+    doGaaSFlowExecutionContextPropagation(
+        jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>"),
+        jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>"),
+        jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT 
SET>>")
+    );
+  }
+
+  public static void propagateGaaSFlowExecutionContext(Properties jobProps) {
+    doGaaSFlowExecutionContextPropagation(
+        jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>"),
+        jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>"),
+        jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT 
SET>>")
+    );
+  }
+
+  protected static void doGaaSFlowExecutionContextPropagation(String 
flowGroup, String flowName, String flowExecId) {
     // TODO: log4j2 has better syntax around conditional logging such that the 
key does not need to be included in the value
-    MDC.put(ConfigurationKeys.FLOW_NAME_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY, 
jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>")));
-    MDC.put(ConfigurationKeys.FLOW_GROUP_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_GROUP_KEY, 
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>")));
-    MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT SET>>")));
+    MDC.put(ConfigurationKeys.FLOW_GROUP_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_GROUP_KEY, flowGroup));
+    MDC.put(ConfigurationKeys.FLOW_NAME_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY, flowName));
+    MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecId));
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 99c4d0536..02631719c 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -25,8 +25,10 @@ import io.temporal.worker.WorkerOptions;
 
 import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
 import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
+import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
 import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
@@ -43,12 +45,12 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
 
     @Override
     protected Class<?>[] getWorkflowImplClasses() {
-        return new Class[] { ProcessWorkUnitsWorkflowImpl.class, 
NestingExecOfProcessWorkUnitWorkflowImpl.class, CommitStepWorkflowImpl.class };
+        return new Class[] { CommitStepWorkflowImpl.class, 
GenerateWorkUnitsWorkflowImpl.class, 
NestingExecOfProcessWorkUnitWorkflowImpl.class, 
ProcessWorkUnitsWorkflowImpl.class };
     }
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new ProcessWorkUnitImpl(), new 
CommitActivityImpl(), new SubmitGTEActivityImpl() };
+        return new Object[] { new CommitActivityImpl(), new 
GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl() 
};
     }
 
     @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
new file mode 100644
index 000000000..7c843e1af
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import java.util.Properties;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/** Workflow simply to generate {@link WorkUnit}s from a {@link 
org.apache.gobblin.source.Source} (and persist them for subsequent processing) 
*/
+@WorkflowInterface
+public interface GenerateWorkUnitsWorkflow {
+  /** @return the number of {@link WorkUnit}s generated and persisted */
+  @WorkflowMethod
+  int generate(Properties props, EventSubmitterContext eventSubmitterContext);
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
new file mode 100644
index 000000000..5fff3fe9b
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Workflow;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+@Slf4j
+public class GenerateWorkUnitsWorkflowImpl implements 
GenerateWorkUnitsWorkflow {
+  public static final Duration startToCloseTimeout = Duration.ofMinutes(90); 
// TODO: make configurable
+
+  private static final RetryOptions ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final ActivityOptions ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
+      .setStartToCloseTimeout(startToCloseTimeout)
+      .setRetryOptions(ACTIVITY_RETRY_OPTS)
+      .build();
+
+  private final GenerateWorkUnits activityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class, ACTIVITY_OPTS);
+
+  @Override
+  public int generate(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
+    return activityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 531a018c7..679a39dae 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -117,7 +117,7 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f) {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
-        .setWorkflowId(Help.qualifyNamePerExec(CHILD_WORKFLOW_ID_BASE, f, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
         .build();
     // TODO: to incorporate multiple different concrete `NestingExecWorkflow` 
sub-workflows in the same super-workflow... shall we use queues?!?!?
     return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
@@ -126,7 +126,7 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   protected CommitStepWorkflow createCommitStepWorkflow() {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
-        .setWorkflowId(Help.qualifyNamePerExec(COMMIT_STEP_WORKFLOW_ID_BASE, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
 WorkerConfig.of(this).orElse(ConfigFactory.empty())))
         .build();
 
     return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index c61d8c72a..12d8861ec 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -93,7 +93,7 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
       List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap, EventBus eventbus)
       throws Exception {
     super(jobProps, HelixUtils.initBaseEventTags(jobProps, metadataTags));
-    log.debug("GobblinJobLauncher: jobProps {}, appWorkDir {}", jobProps, 
appWorkDir);
+    log.debug("GobblinJobLauncher: appWorkDir {}; jobProps {}", appWorkDir, 
jobProps);
     this.runningMap = runningMap;
     this.appWorkDir = appWorkDir;
     this.inputWorkUnitDir = new Path(appWorkDir, 
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index 914b3aa90..82aeb8b20 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -21,16 +21,18 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-
 import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import io.temporal.client.WorkflowClient;
 import io.temporal.serviceclient.WorkflowServiceStubs;
 import io.temporal.workflow.Workflow;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
@@ -38,6 +40,8 @@ import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.*;
 import static 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory.createClientInstance;
@@ -70,7 +74,7 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
                                     List<? extends Tag<?>> metadataTags, 
ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
           throws Exception {
     super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
-    log.debug("GobblinTemporalJobLauncher: jobProps {}, appWorkDir {}", 
jobProps, appWorkDir);
+    log.info("GobblinTemporalJobLauncher: appWorkDir {}; jobProps {}", 
appWorkDir, jobProps);
 
     String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
     this.workflowServiceStubs = createServiceInstance(connectionUri);
@@ -83,6 +87,14 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
     startCancellationExecutor();
   }
 
+  /** @return {@link Config} now featuring all overrides rooted at {@link 
GobblinTemporalConfigurationKeys#GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES}
 */
+  protected Config applyJobLauncherOverrides(Config config) {
+    Config configOverrides = ConfigUtils.getConfig(config,
+        
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES,
 ConfigFactory.empty());
+    log.info("appying config overrides: {}", configOverrides);
+    return configOverrides.withFallback(config);
+  }
+
   @Override
   protected void handleLaunchFinalization() {
     // NOTE: This code only makes sense when there is 1 source / workflow 
being launched per application for Temporal. This is a stop-gap
@@ -104,12 +116,15 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
     log.info("Cancel temporal workflow");
   }
 
+  /** No-op: merely logs a warning, since not expected to be invoked */
   @Override
   protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) {
-    log.info("Temporal removeTasksFromCurrentJob");
+    log.warn("NOT IMPLEMENTED: Temporal removeTasksFromCurrentJob");
   }
 
+  /** No-op: merely logs a warning, since not expected to be invoked */
+  @Override
   protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
-    log.info("Temporal addTasksToCurrentJob");
+    log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
   }
 }
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 6439f41b2..f70f11f75 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -26,8 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-
-import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +40,6 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
-import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -65,19 +63,13 @@ public class JobLauncherUtils {
   private static Cache<String, FileSystem> fileSystemCacheByOwners = 
CacheBuilder.newBuilder().build();
 
   /** Calculate monotonically-increasing paths for multi-WU files */
-  @AllArgsConstructor
-  @NotThreadSafe
   public static class WorkUnitPathCalculator {
-    private int nextMultiWorkUnitTaskId;
-
-    public WorkUnitPathCalculator() {
-      this(0);
-    }
+    private final AtomicInteger nextMultiWorkUnitTaskId = new AtomicInteger(0);
 
     // Serialize each work unit into a file named after the task ID
     public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
       String workUnitFileName = workUnit.isMultiWorkUnit()
-          ? JobLauncherUtils.newMultiTaskId(jobId, nextMultiWorkUnitTaskId++) 
+ JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+          ? JobLauncherUtils.newMultiTaskId(jobId, 
nextMultiWorkUnitTaskId.getAndIncrement()) + 
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
           : workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + 
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
       return new Path(basePath, workUnitFileName);
     }

Reply via email to