This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f6eeb533ad [GOBBLIN-2152] Cleanup activity gobblin temporal (#4047)
f6eeb533ad is described below

commit f6eeb533ad757aaa34bfb8e69a833732956f3775
Author: William Lo <[email protected]>
AuthorDate: Wed Sep 11 13:52:38 2024 -0400

    [GOBBLIN-2152] Cleanup activity gobblin temporal (#4047)
    
    Adds cleanup activity and job scoped directories in Temporal
---
 ...eWorkUnits.java => DeleteWorkDirsActivity.java} |  18 ++--
 .../temporal/ddm/activity/GenerateWorkUnits.java   |   3 +-
 .../ddm/activity/impl/CommitActivityImpl.java      |   8 +-
 .../activity/impl/DeleteWorkDirsActivityImpl.java  | 103 +++++++++++++++++++++
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |  53 +++++++++--
 .../ddm/launcher/ExecuteGobblinJobLauncher.java    |  31 +++++--
 .../ddm/launcher/GenerateWorkUnitsJobLauncher.java |  13 ++-
 .../DirDeletionResult.java}                        |  26 +++---
 .../GenerateWorkUnitsResult.java}                  |  30 +++---
 .../gobblin/temporal/ddm/work/assistance/Help.java |  18 ++--
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |   5 +-
 .../ddm/workflow/GenerateWorkUnitsWorkflow.java    |   3 +-
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |  93 ++++++++++++++++++-
 .../impl/GenerateWorkUnitsWorkflowImpl.java        |   3 +-
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |   7 +-
 .../activity/impl/GenerateWorkUnitsImplTest.java   |  97 +++++++++++++++++++
 .../impl/ExecuteGobblinWorkflowImplTest.java       |  62 +++++++++++++
 17 files changed, 495 insertions(+), 78 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java
similarity index 66%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java
index 5f3f0c6391..16d0333c11 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java
@@ -14,22 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.temporal.ddm.activity;
 
-import java.util.Properties;
+import java.util.Set;
 
 import io.temporal.activity.ActivityInterface;
 import io.temporal.activity.ActivityMethod;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
-/** Activity for generating {@link WorkUnit}s and persisting them to the 
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
+/** Activity for deleting up a list of temporary work directories */
 @ActivityInterface
-public interface GenerateWorkUnits {
-  /** @return the number of {@link WorkUnit}s generated and persisted */
+public interface DeleteWorkDirsActivity {
+  /**
+   * Clean the list of resources specified in the input
+   * TODO: Generalize the input to support multiple platforms outside of just 
HDFS
+   */
   @ActivityMethod
-  int generateWorkUnits(Properties jobProps, EventSubmitterContext 
eventSubmitterContext);
+  DirDeletionResult delete(WUProcessingSpec workSpec, EventSubmitterContext 
eventSubmitterContext, Set<String> workDirPaths);
 }
+
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
index 5f3f0c6391..862b46c40f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
@@ -23,6 +23,7 @@ import io.temporal.activity.ActivityInterface;
 import io.temporal.activity.ActivityMethod;
 
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
@@ -31,5 +32,5 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 public interface GenerateWorkUnits {
   /** @return the number of {@link WorkUnit}s generated and persisted */
   @ActivityMethod
-  int generateWorkUnits(Properties jobProps, EventSubmitterContext 
eventSubmitterContext);
+  GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSubmitterContext);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index e8490d7141..97699f2ce2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -153,7 +153,7 @@ public class CommitActivityImpl implements CommitActivity {
               }).iterator(), numCommitThreads,
           // TODO: Rewrite executorUtils to use java util optional
           
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), 
com.google.common.base.Optional.of("Commit-thread-%d")))
-              .executeAndGetResults();
+          .executeAndGetResults();
 
       IteratorExecutor.logFailures(result, null, 10);
 
@@ -192,8 +192,8 @@ public class CommitActivityImpl implements CommitActivity {
                 // `TaskState extends WorkUnit` serialization will include its 
constituent `WorkUnit`, but not the constituent `JobState`.
                 // given some `JobState` props may be essential for 
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
                 taskState.setJobState(jobState)
-                // TODO - decide whether something akin necessary to 
streamline cumulative in-memory size of all issues: 
consumeTaskIssues(taskState);
-            ).collect(Collectors.toList())
+            // TODO - decide whether something akin necessary to streamline 
cumulative in-memory size of all issues: consumeTaskIssues(taskState);
+        ).collect(Collectors.toList())
     ).orElseGet(() -> {
       log.error("TaskStateStore successfully opened, but no task states found 
under (name) '{}'", jobIdPathName);
       return Lists.newArrayList();
@@ -235,4 +235,4 @@ public class CommitActivityImpl implements CommitActivity {
   private static String calcCommitId(WUProcessingSpec workSpec) {
     return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java
new file mode 100644
index 0000000000..6b14a70156
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import io.temporal.failure.ApplicationFailure;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class DeleteWorkDirsActivityImpl implements DeleteWorkDirsActivity {
+  static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
+  @Override
+  public DirDeletionResult delete(WUProcessingSpec workSpec, 
EventSubmitterContext eventSubmitterContext, Set<String> workDirPaths) {
+    // Ensure that non HDFS writers exit early as they rely on a different 
cleanup process, can consider consolidation in the future
+    // through an abstracted cleanup method implemented at a writer level
+    if (workDirPaths.isEmpty()) {
+      return new DirDeletionResult();
+    }
+    //TODO: Emit timers to measure length of cleanup step
+    Optional<String> optJobName = Optional.empty();
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      optJobName = Optional.ofNullable(jobState.getJobName());
+
+      Map<String, Boolean> attemptedCleanedDirectories = 
jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK, 
ConfigurationKeys.DEFAULT_CLEANUP_STAGING_DATA_PER_TASK) ?
+          cleanupStagingDataPerTask(jobState, workDirPaths) : 
cleanupStagingDataForEntireJob(jobState, workDirPaths);
+
+      return new DirDeletionResult(attemptedCleanedDirectories);
+    } catch (Exception e) {
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed to cleanup temporary folders for job %s", 
optJobName.orElse(UNDEFINED_JOB_NAME)),
+          IOException.class.toString(),
+          new IOException(e)
+      );
+    }
+  }
+
+  //TODO: Support task level deletes if necessary, currently it is deemed 
redundant due to collecting temp dirs during generate work unit step
+  private static Map<String, Boolean> cleanupStagingDataPerTask(JobState 
jobState, Set<String> workDirPaths) throws IOException {
+    throw new IOException("Clean up staging data by task is not supported. 
Please set " + ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK + " to false.");
+  }
+
+  private static Map<String, Boolean> cleanupStagingDataForEntireJob(JobState 
state, Set<String> workDirPaths) throws IOException {
+
+    String writerFsUri = 
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
+    FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri, 
WriterUtils.getFsConfiguration(state));
+    Map<String, Boolean> attemptedCleanedDirectories = new HashMap<>();
+
+    for (String resource : workDirPaths) {
+      Path pathToClean = new Path(resource);
+      log.info("Deleting resource directory " + pathToClean);
+      try {
+        HadoopUtils.deletePath(fs, pathToClean, true);
+        attemptedCleanedDirectories.put(resource, true);
+      } catch (IOException e) {
+        boolean doesExist = fs.exists(pathToClean);
+        // Only record failure to clean if the directory still exists, if it 
does not then we can assume it was already cleaned by another process
+        if (doesExist) {
+          log.error("Failed to delete resource directory " + pathToClean, e);
+          attemptedCleanedDirectories.put(resource, false);
+        }
+      }
+    }
+    return attemptedCleanedDirectories;
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 242b2c69e5..8344bae6f9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -18,8 +18,11 @@
 package org.apache.gobblin.temporal.ddm.activity.impl;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +33,7 @@ import com.google.common.io.Closer;
 import io.temporal.failure.ApplicationFailure;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -40,10 +44,12 @@ import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.WorkUnitStreamSource;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -53,7 +59,7 @@ import 
org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
 
   @Override
-  public int generateWorkUnits(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
+  public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
     // TODO: decide whether to acquire a job lock (as MR did)!
     // TODO: provide for job cancellation (unless handling at the 
temporal-level of parent workflows)!
     JobState jobState = new JobState(jobProps);
@@ -74,12 +80,12 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       FileSystem fs = JobStateUtils.openFileSystem(jobState);
       fs.mkdirs(workDirRoot);
 
-      List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState, 
eventSubmitterContext, closer);
-
+      Set<String> resourcesToCleanUp = new HashSet<>();
+      List<WorkUnit> workUnits = 
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, 
eventSubmitterContext, closer, resourcesToCleanUp);
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
       JobStateUtils.writeJobState(jobState, workDirRoot, fs);
 
-      return jobState.getTaskCount();
+      return new GenerateWorkUnitsResult(jobState.getTaskCount(), 
resourcesToCleanUp);
     } catch (ReflectiveOperationException roe) {
       String errMsg = "Unable to construct a source for generating workunits 
for job " + jobState.getJobId();
       log.error(errMsg, roe);
@@ -94,7 +100,8 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     }
   }
 
-  protected static List<WorkUnit> generateWorkUnitsForJobState(JobState 
jobState, EventSubmitterContext eventSubmitterContext, Closer closer)
+  protected List<WorkUnit> 
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, 
EventSubmitterContext eventSubmitterContext, Closer closer,
+      Set<String> resourcesToCleanUp)
       throws ReflectiveOperationException {
     Source<?, ?> source = JobStateUtils.createSource(jobState);
     WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
@@ -120,7 +127,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     DestinationDatasetHandlerService datasetHandlerService = closer.register(
         new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, 
eventSubmitterContext.create()));
     WorkUnitStream handledWorkUnitStream = 
datasetHandlerService.executeHandlers(workUnitStream);
-
+    
resourcesToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
     // initialize writer and converter(s)
     // TODO: determine whether registration here is effective, or the 
lifecycle of this activity is too brief (as is likely!)
     closer.register(WriterInitializerFactory.newInstace(jobState, 
handledWorkUnitStream)).initialize();
@@ -142,4 +149,38 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
 
     return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
   }
+
+  protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream 
workUnitStream) {
+    Set<String> resourcesToCleanUp = new HashSet<>();
+    // Validate every workunit if they have the temp dir props since some 
workunits may be commit steps
+    Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
+    while (workUnitIterator.hasNext()) {
+      WorkUnit workUnit = workUnitIterator.next();
+      if (workUnit.isMultiWorkUnit()) {
+        List<WorkUnit> workUnitList = ((MultiWorkUnit) 
workUnit).getWorkUnits();
+        for (WorkUnit wu : workUnitList) {
+          
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
+        }
+      } else {
+        
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
+      }
+    }
+    return resourcesToCleanUp;
+  }
+
+
+  private static Set<String> 
collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit workUnit) {
+    Set<String> resourcesToCleanUp = new HashSet<>();
+    if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
+      
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
+    }
+    if (workUnit.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+      
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+    }
+    if (workUnit.getPropAsBoolean(ConfigurationKeys.CLEAN_ERR_DIR, 
ConfigurationKeys.DEFAULT_CLEAN_ERR_DIR)
+        && workUnit.contains(ConfigurationKeys.ROW_LEVEL_ERR_FILE)) {
+      
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));
+    }
+    return resourcesToCleanUp;
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 6950e6a678..6efb93d128 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -24,14 +24,19 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.eventbus.EventBus;
-import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.client.WorkflowOptions;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
 import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
@@ -41,6 +46,8 @@ import 
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
@@ -74,22 +81,32 @@ public class ExecuteGobblinJobLauncher extends 
GobblinTemporalJobLauncher {
   @Override
   public void submitJob(List<WorkUnit> workunits) {
     try {
+      Properties finalProps = adjustJobProperties(this.jobProps);
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
-          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps)))
+          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(finalProps)))
           .build();
       ExecuteGobblinWorkflow workflow = 
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
 
-      Config jobConfigWithOverrides = 
applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));
-
-      Help.propagateGaaSFlowExecutionContext(this.jobProps);
+      Help.propagateGaaSFlowExecutionContext(finalProps);
       EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext.Builder(eventSubmitter)
-          .withGaaSJobProps(this.jobProps)
+          .withGaaSJobProps(finalProps)
           .build();
-      ExecGobblinStats execGobblinStats = 
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
+      ExecGobblinStats execGobblinStats = workflow.execute(finalProps, 
eventSubmitterContext);
       log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", 
execGobblinStats);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
+
+  // Generate properties such as Job ID, modifying task staging dirs and 
output dirs
+  protected Properties adjustJobProperties(Properties inputJobProps) throws 
Exception {
+    SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseProperties(inputJobProps),
+        GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    Properties configOverridesProp = 
ConfigUtils.configToProperties(applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(inputJobProps)));
+    configOverridesProp.setProperty(ConfigurationKeys.JOB_ID_KEY, 
JobLauncherUtils.newJobId(JobState.getJobNameFromProps(configOverridesProp),
+        PropertiesUtils.getPropAsLong(configOverridesProp, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis())));
+    JobContext jobContext = new JobContext(configOverridesProp, log, 
instanceBroker, null);
+    return jobContext.getJobState().getProperties();
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
index 6ddc96a977..68b7910bcd 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -17,24 +17,24 @@
 
 package org.apache.gobblin.temporal.ddm.launcher;
 
-import com.typesafe.config.Config;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.client.WorkflowOptions;
-
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
@@ -80,9 +80,8 @@ public class GenerateWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
 
       Help.propagateGaaSFlowExecutionContext(this.jobProps);
       EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext.Builder(this.eventSubmitter).build();
-
-      int numWorkUnits = 
workflow.generate(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
-      log.info("FINISHED - GenerateWorkUnitsWorkflow.generate = {}", 
numWorkUnits);
+      GenerateWorkUnitsResult generateWorkUnitStats = 
workflow.generate(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
+      log.info("FINISHED - GenerateWorkUnitsWorkflow.generate = {}", 
generateWorkUnitStats.getGeneratedWuCount());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
similarity index 54%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
index 5f3f0c6391..257e5b5772 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
@@ -15,21 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.ddm.activity;
+package org.apache.gobblin.temporal.ddm.work;
 
-import java.util.Properties;
+import java.util.Map;
 
-import io.temporal.activity.ActivityInterface;
-import io.temporal.activity.ActivityMethod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
+/**
+ * Data structure representing the stats for a cleaned up work directory, 
where it returns a map of directories the result of their cleanup
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class DirDeletionResult {
 
-/** Activity for generating {@link WorkUnit}s and persisting them to the 
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
-@ActivityInterface
-public interface GenerateWorkUnits {
-  /** @return the number of {@link WorkUnit}s generated and persisted */
-  @ActivityMethod
-  int generateWorkUnits(Properties jobProps, EventSubmitterContext 
eventSubmitterContext);
+  @NonNull private Map<String, Boolean> successesByDirPath;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
similarity index 52%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
index 5f3f0c6391..5f798055e2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.ddm.activity;
+package org.apache.gobblin.temporal.ddm.work;
 
-import java.util.Properties;
+import java.util.Set;
 
-import io.temporal.activity.ActivityInterface;
-import io.temporal.activity.ActivityMethod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
-
-/** Activity for generating {@link WorkUnit}s and persisting them to the 
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
-@ActivityInterface
-public interface GenerateWorkUnits {
-  /** @return the number of {@link WorkUnit}s generated and persisted */
-  @ActivityMethod
-  int generateWorkUnits(Properties jobProps, EventSubmitterContext 
eventSubmitterContext);
+/**
+ * Data structure representing the result of generating work units, where it 
returns the number of generated work units and
+ * the folders should be cleaned up after the full job execution is completed
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class GenerateWorkUnitsResult {
+  @NonNull private int generatedWuCount;
+  // Resources that the Temporal Job Launcher should clean up for Gobblin 
temporary work directory paths in writers
+  @NonNull private Set<String> workDirPathsToDelete;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 462e784228..e2011d7a9c 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -22,22 +22,20 @@ import java.net.URI;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.MDC;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-
 import com.typesafe.config.Config;
 
-import org.slf4j.Logger;
-import org.slf4j.MDC;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -48,8 +46,8 @@ import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
-import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index d4b8b3a911..74737af598 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -20,11 +20,13 @@ package org.apache.gobblin.temporal.ddm.worker;
 import java.util.concurrent.TimeUnit;
 
 import com.typesafe.config.Config;
+
 import io.temporal.client.WorkflowClient;
 import io.temporal.worker.WorkerOptions;
 
 import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
 import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
+import 
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
@@ -52,7 +54,8 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new CommitActivityImpl(), new 
GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl() 
};
+        return new Object[] { new CommitActivityImpl(), new 
DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(),
+            new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()};
     }
 
     @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
index 7c843e1afe..6865c765f5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
@@ -23,6 +23,7 @@ import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
@@ -31,5 +32,5 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 public interface GenerateWorkUnitsWorkflow {
   /** @return the number of {@link WorkUnit}s generated and persisted */
   @WorkflowMethod
-  int generate(Properties props, EventSubmitterContext eventSubmitterContext);
+  GenerateWorkUnitsResult generate(Properties props, EventSubmitterContext 
eventSubmitterContext);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index f2723bef97..8a04e6e519 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -17,10 +17,15 @@
 
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
+import java.io.IOException;
 import java.net.URI;
 import java.time.Duration;
+import java.util.HashSet;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.typesafe.config.ConfigFactory;
@@ -36,11 +41,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
 import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -72,23 +80,41 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   private final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class,
       GEN_WUS_ACTIVITY_OPTS);
 
+  private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
+      .setStartToCloseTimeout(Duration.ofHours(1))
+      .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
+      .build();
+  private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = 
Workflow.newActivityStub(DeleteWorkDirsActivity.class, 
DELETE_WORK_DIRS_ACTIVITY_OPTS);
+
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit();
     EventTimer timer = timerFactory.createJobTimer();
+    Optional<GenerateWorkUnitsResult> generateWorkUnitResultsOpt = 
Optional.empty();
+    WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
+    boolean isSuccessful = false;
     try {
-      int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      generateWorkUnitResultsOpt = 
Optional.of(genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext));
+      int numWUsGenerated = 
generateWorkUnitResultsOpt.get().getGeneratedWuCount();
       int numWUsCommitted = 0;
       CommitStats commitStats = CommitStats.createEmpty();
       if (numWUsGenerated > 0) {
-        WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
         ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
         commitStats = processWUsWorkflow.process(wuSpec);
         numWUsCommitted = commitStats.getNumCommittedWorkUnits();
       }
       timer.stop();
-      return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, 
jobProps.getProperty(Help.USER_TO_PROXY_KEY), commitStats.getDatasetStats());
+      isSuccessful = true;
+      return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, 
jobProps.getProperty(Help.USER_TO_PROXY_KEY),
+          commitStats.getDatasetStats());
     } catch (Exception e) {
       // Emit a failed GobblinTrackingEvent to record job failures
       timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
@@ -98,6 +124,27 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
           e,
           null
       );
+    } finally {
+      // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid 
flight
+      try {
+        log.info("Cleaning up work dirs for job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+        if (generateWorkUnitResultsOpt.isPresent()) {
+          cleanupWorkDirs(wuSpec, eventSubmitterContext, 
generateWorkUnitResultsOpt.get().getWorkDirPathsToDelete());
+        } else {
+          log.warn("Skipping cleanup of work dirs for job due to no output 
from GenerateWorkUnits");
+        }
+      } catch (IOException e) {
+        // Only fail the job with a new failure if the job was successful, 
otherwise keep the original error
+        if (isSuccessful) {
+          throw ApplicationFailure.newNonRetryableFailureWithCause(
+              String.format("Failed cleaning Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
+              e.getClass().getName(),
+              e,
+              null
+          );
+        }
+        log.error("Failed to cleanup work dirs", e);
+      }
     }
   }
 
@@ -123,4 +170,44 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     }
     return wuSpec;
   }
+
+  private void cleanupWorkDirs(WUProcessingSpec workSpec, 
EventSubmitterContext eventSubmitterContext, Set<String> directoriesToClean)
+      throws IOException {
+    // TODO: Add configuration to support cleaning up historical work dirs 
from same job name
+    FileSystem fs = Help.loadFileSystem(workSpec);
+    JobState jobState = Help.loadJobState(workSpec, fs);
+    // TODO: Avoid cleaning up if work is being checkpointed e.g. midway of a 
commit for EXACTLY_ONCE
+
+    if (PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.CLEANUP_STAGING_DATA_BY_INITIALIZER, "false")) {
+      log.info("Skipping cleanup of work dirs for job due to initializer 
handling the cleanup");
+      return;
+    }
+
+    DirDeletionResult dirDeletionResult = 
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
+        calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
+
+    for (String dir : directoriesToClean) {
+      if (!dirDeletionResult.getSuccessesByDirPath().get(dir)) {
+        throw new IOException("Unable to delete one of more directories in the 
DeleteWorkDirsActivity. Please clean up manually.");
+      }
+    }
+  }
+
+  protected static Set<String> calculateWorkDirsToDelete(String jobId, 
Set<String> workDirsToClean) throws IOException {
+    // Only delete directories that are associated with the current job, 
otherwise
+    Set<String> resultSet = new HashSet<>();
+    Set<String> nonJobDirs = new HashSet<>();
+    for (String dir : workDirsToClean) {
+      if (dir.contains(jobId)) {
+        resultSet.add(dir);
+      } else {
+        log.warn("Skipping deletion of directory {} as it is not associated 
with job {}", dir, jobId);
+        nonJobDirs.add(dir);
+      }
+    }
+    if (!nonJobDirs.isEmpty()) {
+      throw new IOException("Found directories set to delete not associated 
with job " + jobId + ": " + nonJobDirs + ". Please validate staging and output 
directories");
+    }
+    return resultSet;
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
index 5fff3fe9b2..ad68fb518f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
@@ -27,6 +27,7 @@ import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
 import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
@@ -50,7 +51,7 @@ public class GenerateWorkUnitsWorkflowImpl implements 
GenerateWorkUnitsWorkflow
   private final GenerateWorkUnits activityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class, ACTIVITY_OPTS);
 
   @Override
-  public int generate(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
+  public GenerateWorkUnitsResult generate(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
     return activityStub.generateWorkUnits(jobProps, eventSubmitterContext);
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index c8afbce25a..975b9f6043 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -23,7 +23,6 @@ import com.typesafe.config.ConfigFactory;
 import io.temporal.api.enums.v1.ParentClosePolicy;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
-
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
@@ -59,10 +58,8 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   private CommitStats performWork(WUProcessingSpec workSpec) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
-    int workunitsProcessed = processingWorkflow.performWorkload(
-        WorkflowAddr.ROOT, workload, 0,
-        workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
-    );
+    int workunitsProcessed = 
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+        workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
     if (workunitsProcessed > 0) {
       CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
       CommitStats result = commitWorkflow.commit(workSpec);
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
new file mode 100644
index 0000000000..86c5ac12de
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+
+
+public class GenerateWorkUnitsImplTest {
+
+  @Test
+  public void testFetchesWorkDirsFromWorkUnits() {
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      WorkUnit workUnit = new WorkUnit();
+      workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i);
+      workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i);
+      workUnit.setProp("qualitychecker.row.err.file", 
"/tmp/jobId/row-err/file");
+      workUnit.setProp("qualitychecker.clean.err.dir", "true");
+      workUnits.add(workUnit);
+    }
+    WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+        .setFiniteStream(true)
+        .build();
+    Set<String> output = 
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+    Assert.assertEquals(output.size(), 11);
+  }
+
+  @Test
+  public void testFetchesWorkDirsFromMultiWorkUnits() {
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+      for (int j = 0; j < 3; j++) {
+        WorkUnit workUnit = new WorkUnit();
+        workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/");
+        workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/");
+        workUnit.setProp("qualitychecker.row.err.file", 
"/tmp/jobId/row-err/file");
+        workUnit.setProp("qualitychecker.clean.err.dir", "true");
+        multiWorkUnit.addWorkUnit(workUnit);
+      }
+      workUnits.add(multiWorkUnit);
+    }
+    WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+        .setFiniteStream(true)
+        .build();
+    Set<String> output = 
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+    Assert.assertEquals(output.size(), 3);
+  }
+
+  @Test
+  public void testFetchesUniqueWorkDirsFromMultiWorkUnits() {
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+      for (int j = 0; j < 3; j++) {
+        WorkUnit workUnit = new WorkUnit();
+        // Each MWU will have its own staging and output dir
+        workUnit.setProp("writer.staging.dir", "/tmp/jobId/" + i + 
"/task-staging/");
+        workUnit.setProp("writer.output.dir", "/tmp/jobId/" + i + 
"task-output/");
+        workUnit.setProp("qualitychecker.row.err.file", 
"/tmp/jobId/row-err/file");
+        workUnit.setProp("qualitychecker.clean.err.dir", "true");
+        multiWorkUnit.addWorkUnit(workUnit);
+      }
+      workUnits.add(multiWorkUnit);
+    }
+    WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+        .setFiniteStream(true)
+        .build();
+    Set<String> output = 
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+    Assert.assertEquals(output.size(), 11);
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java
new file mode 100644
index 0000000000..8d8ad982a6
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ExecuteGobblinWorkflowImplTest {
+
+  @Test
+  public void testCalculateWorkDirsDeletion() throws Exception {
+    String jobId = "jobId";
+    Set<String> dirsToDelete = new HashSet<>();
+    dirsToDelete.add("/tmp/jobId/task-staging/");
+    dirsToDelete.add("/tmp/jobId/task-output/");
+    dirsToDelete.add("/tmp/jobId/task-output/file");
+    dirsToDelete.add("/tmp/jobId/otherDir");
+    Set<String> result = 
ExecuteGobblinWorkflowImpl.calculateWorkDirsToDelete(jobId, dirsToDelete);
+    Assert.assertEquals(result.size(), 4);
+    Assert.assertEquals(result, dirsToDelete);
+  }
+
+  @Test
+  public void testThrowsIfNonJobDirInWorkDirs() throws Exception {
+    String jobId = "jobId";
+    Set<String> dirsToDelete = new HashSet<>();
+    dirsToDelete.add("/tmp/jobId/task-staging/");
+    dirsToDelete.add("/tmp/jobId/task-output/");
+    dirsToDelete.add("/tmp/jobId/task-output/file");
+    dirsToDelete.add("/tmp/jobId");
+    // Add a non-job dir that should blow up the job
+    dirsToDelete.add("/tmp");
+    dirsToDelete.add("/sharedDir");
+
+    Assert.assertThrows(IOException.class, () -> 
ExecuteGobblinWorkflowImpl.calculateWorkDirsToDelete(jobId, dirsToDelete));
+    try {
+      ExecuteGobblinWorkflowImpl.calculateWorkDirsToDelete(jobId, 
dirsToDelete);
+    } catch (IOException e) {
+      Assert.assertEquals(e.getMessage(),
+          "Found directories set to delete not associated with job jobId: 
[/tmp, /sharedDir]. Please validate staging and output directories");
+    }
+  }
+}

Reply via email to