This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e128c909b [GOBBLIN-2247] Add cleanup logic for Staging directories in 
temporal flow (#4166)
5e128c909b is described below

commit 5e128c909bfc095f07b58077d12f9e4f83b229cf
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Feb 27 11:00:26 2026 +0530

    [GOBBLIN-2247] Add cleanup logic for Staging directories in temporal flow 
(#4166)
    
    Implements graceful shutdown support across Gobblin Yarn and Temporal by:
    
    Yarn Launcher: Sends GRACEFUL_SHUTDOWN signal to AM container and waits for 
terminal state
    Temporal Job Launcher: Executes shutdown hook to clean up staging and 
working directories
    This ensures proper cleanup during job termination while allowing the 
Application Master time to finish cleanup tasks.
---
 .../opentelemetry/GobblinOpenTelemetryMetrics.java |  14 +-
 .../GobblinOpenTelemetryMetricsConstants.java      |   1 +
 .../temporal/GobblinTemporalConfigurationKeys.java |   3 +
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |   6 +
 .../temporal/joblauncher/GobblinJobLauncher.java   | 281 +++++++++++++++++++--
 .../activity/impl/GenerateWorkUnitsImplTest.java   |  58 +++++
 .../GobblinTemporalJobLauncherTest.java            | 123 ++++++++-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  88 +++++++
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |  14 +
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   |  78 ++++++
 10 files changed, 641 insertions(+), 25 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
index c54f213e24..1ff8334d8d 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
@@ -36,7 +36,19 @@ public enum GobblinOpenTelemetryMetrics {
    * Metric to track the latency of each Gobblin Job state (GenerateWorkUnit, 
ProcessWorkUnit, CommitStep).
    * Metric Unit: seconds (s) represents the time taken for each state.
    * */
-  GOBBLIN_JOB_STATE_LATENCY("gobblin.job.state.latency", "Gobblin job state 
latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
+  GOBBLIN_JOB_STATE_LATENCY("gobblin.job.state.latency", "Gobblin job state 
latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM),
+
+  /**
+   * Metric to track the time taken to delete staging directories during 
cleanup, broken down by filesystem scheme.
+   * Metric Unit: seconds (s).
+   * */
+  GOBBLIN_STAGING_CLEANUP_LATENCY("gobblin.staging.cleanup.latency", "Gobblin 
staging directory cleanup latency", "s", 
OpenTelemetryMetricType.DOUBLE_HISTOGRAM),
+
+  /**
+   * Metric to track the time taken to delete working directories (work units, 
task states, job state) during cleanup, broken down by filesystem scheme.
+   * Metric Unit: seconds (s).
+   * */
+  
GOBBLIN_WORK_DIRECTORY_CLEANUP_LATENCY("gobblin.work.directory.cleanup.latency",
 "Gobblin working directory cleanup latency", "s", 
OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
 
   private final String metricName;
   private final String metricDescription;
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
index d998380a54..7c1ff493e1 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
@@ -22,6 +22,7 @@ public class GobblinOpenTelemetryMetricsConstants {
   public static class DimensionKeys {
     public static final String STATE = "state";
     public static final String CURR_STATE = "currState";
+    public static final String FS_SCHEME = "fsScheme";
   }
 
   public static class DimensionValues {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index bc87fe1f55..982b4af7f3 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -52,6 +52,9 @@ public interface GobblinTemporalConfigurationKeys {
   String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides";
   String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX + 
"work.dir.cleanup.enabled";
   String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true";
+  String WORK_DIR_PATHS_TO_DELETE = PREFIX + "work.dir.paths.to.delete";
+  String SHUTDOWN_CLEANUP_TIMEOUT_SECONDS = PREFIX + 
"shutdown.cleanup.timeout.seconds";
+  String DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS = "600";
 
   String GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX = PREFIX + 
"container.metrics.";
   String GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME = 
GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "application.name";
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 9b35921158..6f14384add 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -76,6 +76,7 @@ import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -196,6 +197,11 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
           
jobState.setProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY,
               Initializer.AfterInitializeMemento.serialize(memento))
       );
+      // Store the work dir paths to delete in JobState for reuse during 
cleanup
+      if (!genWUsInsights.getPathsToCleanUp().isEmpty()) {
+        
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
+            String.join(",", genWUsInsights.getPathsToCleanUp()));
+      }
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
       JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: 
the writing of `JobState` after all WUs signifies WU gen+serialization now 
complete
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index ef0e9de0bc..1bf04dfe39 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -19,14 +19,19 @@ package org.apache.gobblin.temporal.joblauncher;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.gobblin.util.WriterUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -61,6 +66,11 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+import 
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryDoubleHistogram;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryHelper;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryInstrumentation;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 
@@ -96,6 +106,8 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
   protected JobListener jobListener;
   protected volatile boolean jobSubmitted = false;
   private final ExecutorService executor;
+  private final long shutdownCleanupTimeoutSeconds;
+  private final AtomicBoolean cleanupExecuted = new AtomicBoolean(false);
 
   public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
       List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap, EventBus eventbus)
@@ -132,17 +144,151 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performCleanup, "GobblinJobLauncher-Cleanup-" + 
this.jobContext.getJobId()));
+  }
+
+  /**
+   * Performs cleanup operations during JVM shutdown.
+   * Creates a dedicated FileSystem instance, loads JobState, and executes 
cleanup tasks in parallel.
+   */
+  private void performCleanup() {
+    if (!cleanupExecuted.compareAndSet(false, true)) {
+      log.info("Cleanup already executed for job {}, skipping", 
this.jobContext.getJobId());
+      return;
+    }
+    log.info("Performing cleanup for job {}", this.jobContext.getJobId());
+
+    String fsUriStr = this.jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI);
+    FileSystem shutdownFs = null;
     try {
-      cleanupWorkingDirectory();
+      shutdownFs = FileSystem.newInstance(URI.create(fsUriStr), new 
Configuration());
+      JobState jobState = loadJobStateWithFallback(shutdownFs);
+      executeParallelCleanup(jobState, shutdownFs);
+
+      log.info("Shutdown hook: cleanup completed for job {}", 
this.jobContext.getJobId());
+    } catch (Exception e) {
+      log.error("Shutdown hook: cleanup failed for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
     } finally {
-      super.close();
+      closeFileSystemQuietly(shutdownFs);
+    }
+  }
+
+  /**
+   * Loads JobState from persisted file with fallback to in-memory state.
+   * @param fs FileSystem to use for loading JobState
+   * @return JobState loaded from file, or in-memory JobState if file loading 
fails
+   */
+  private JobState loadJobStateWithFallback(FileSystem fs) {
+    JobState jobState = loadJobStateFromFile(fs);
+    if (jobState == null) {
+      log.warn("Shutdown hook: could not load JobState from file, trying using 
in-memory state");
+      jobState = this.jobContext.getJobState();
+    }
+    return jobState;
+  }
+
+  /**
+   * Executes staging and working directory cleanup tasks in parallel.
+   * @param jobState JobState containing cleanup configuration and paths
+   * @throws Exception if cleanup tasks fail or timeout
+   */
+  private void executeParallelCleanup(JobState jobState, FileSystem fs) throws 
Exception {
+    java.util.concurrent.CompletableFuture<Void> stagingCleanup = 
createStagingCleanupTask(jobState);
+    java.util.concurrent.CompletableFuture<Void> workDirCleanup = 
createWorkDirCleanupTask(fs);
+
+    java.util.concurrent.CompletableFuture.allOf(stagingCleanup, 
workDirCleanup)
+        .get(this.shutdownCleanupTimeoutSeconds, 
java.util.concurrent.TimeUnit.SECONDS);
+  }
+
+  /**
+   * Creates an asynchronous task for staging directory cleanup.
+   */
+  private java.util.concurrent.CompletableFuture<Void> 
createStagingCleanupTask(JobState jobState) {
+    return java.util.concurrent.CompletableFuture.runAsync(() -> {
+      try {
+        cleanupStagingDirectory(jobState);
+      } catch (Exception e) {
+        log.warn("Shutdown hook: staging cleanup failed: {}", e.getMessage(), 
e);
+      }
+    });
+  }
+
+  /**
+   * Creates an asynchronous task for working directory cleanup.
+   */
+  private java.util.concurrent.CompletableFuture<Void> 
createWorkDirCleanupTask(FileSystem fs) {
+    return java.util.concurrent.CompletableFuture.runAsync(() -> {
+      try {
+        cleanupWorkingDirectory(fs);
+      } catch (Exception e) {
+        log.warn("Shutdown hook: working directory cleanup failed: {}", 
e.getMessage(), e);
+      }
+    });
+  }
+
+  /**
+   * Closes the FileSystem quietly, logging any errors
+   */
+  private void closeFileSystemQuietly(FileSystem fs) {
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch (Exception e) {
+        log.error("Failed to close shutdown FileSystem: {}", e.getMessage());
+      }
     }
   }
 
+  /**
+   * Loads JobState from the persisted file written by GenerateWorkUnits.
+   * Returns null if JobState cannot be loaded.
+   */
+  private JobState loadJobStateFromFile(FileSystem fs) {
+    try {
+      Path workDirRoot = 
JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
+      Path jobStateFile = new Path(workDirRoot, "job.state");
+
+      if (!fs.exists(jobStateFile)) {
+        log.error("JobState file does not exist at {}", jobStateFile);
+        return null;
+      }
+
+      JobState jobState = new JobState();
+      try (java.io.DataInputStream dis = new 
java.io.DataInputStream(fs.open(jobStateFile))) {
+        jobState.readFields(dis);
+        log.info("Successfully loaded JobState from {}", jobStateFile);
+        return jobState;
+      }
+    } catch (Exception e) {
+      log.error("Failed to load JobState from file: ", e);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    performCleanup();
+    super.close();
+  }
+
+  @VisibleForTesting
+  void triggerCleanupForTest() {
+    performCleanup();
+  }
+
   public String getJobId() {
     return this.jobContext.getJobId();
   }
@@ -183,7 +329,6 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
 
       // The last iteration of output TaskState collecting will run when the 
collector service gets stopped
       this.taskStateCollectorService.stopAsync().awaitTerminated();
-      cleanupWorkingDirectory();
     }
   }
 
@@ -266,30 +411,120 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
   }
 
   /**
-   * Delete persisted {@link WorkUnit}s and {@link JobState} upon job 
completion.
+   * Delete writer staging and output directories using paths from JobState.
    */
-  protected void cleanupWorkingDirectory() throws IOException {
-    log.info("Deleting persisted work units for job " + 
this.jobContext.getJobId());
-    stateStores.getWuStateStore().delete(this.jobContext.getJobId());
+  @VisibleForTesting
+  protected void cleanupStagingDirectory(JobState jobState) throws IOException 
{
+    if (!isCleanupEnabled(jobState)) {
+      return;
+    }
+
+    Set<String> pathsToDelete = getPathsToDelete(jobState);
+    if (pathsToDelete.isEmpty()) {
+      return;
+    }
+
+    log.info("Cleaning up {} work directories for job: {}", 
pathsToDelete.size(), this.jobContext.getJobId());
 
-    // delete the directory that stores the task state files
-    stateStores.getTaskStateStore().delete(outputTaskStateDir.getName());
+    String writerFsUri = 
jobState.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
+    String fsScheme = URI.create(writerFsUri).getScheme();
+    FileSystem writerFs = FileSystem.newInstance(URI.create(writerFsUri), 
WriterUtils.getFsConfiguration(jobState));
+    long startTimeMs = System.currentTimeMillis();
+    try {
+      for (String pathStr : pathsToDelete) {
+        Path path = new Path(pathStr);
+        try {
+          if (writerFs.exists(path)) {
+            log.info("Deleting work directory: {}", path);
+            writerFs.delete(path, true);
+          } else {
+            log.info("Work directory does not exist or not accessible: {}", 
path);
+          }
+        } catch (Exception e) {
+          log.error("Failed to delete work directory {}: {}", pathStr, 
e.getMessage(), e);
+        }
+      }
+    } finally {
+      closeFileSystemQuietly(writerFs);
+      emitStagingCleanupLatency(startTimeMs, fsScheme);
+    }
+  }
 
-    log.info("Deleting job state file for job " + this.jobContext.getJobId());
+  private void emitStagingCleanupLatency(long startTimeMs, String fsScheme) {
+    
emitLatencyMetric(GobblinOpenTelemetryMetrics.GOBBLIN_STAGING_CLEANUP_LATENCY, 
startTimeMs, fsScheme);
+  }
 
-    if (this.stateStores.haveJobStateStore()) {
-      this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
-    } else {
-      Path jobStateFilePath =
-          GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir, 
this.jobContext.getJobId());
-      this.fs.delete(jobStateFilePath, false);
+  private void emitLatencyMetric(GobblinOpenTelemetryMetrics metric, long 
startTimeMs, String fsScheme) {
+    try {
+      double durationSeconds = (System.currentTimeMillis() - startTimeMs) / 
1000.0;
+      Map<String, String> attributes = 
Collections.singletonMap(GobblinOpenTelemetryMetricsConstants.DimensionKeys.FS_SCHEME,
 fsScheme);
+      OpenTelemetryDoubleHistogram histogram = 
OpenTelemetryInstrumentation.getInstance(this.jobProps)
+          .getOrCreate(metric);
+      histogram.record(durationSeconds, 
OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+    } catch (Exception e) {
+      log.warn("Failed to emit metric {}: {}", metric.getMetricName(), 
e.getMessage());
     }
+  }
 
-    if 
(Boolean.parseBoolean(this.jobProps.getProperty(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
-        
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED)))
 {
-      Path workDirRootPath = 
JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
-      log.info("Cleaning up work directory : {} for job : {}", 
workDirRootPath, this.jobContext.getJobId());
-      this.fs.delete(workDirRootPath, true);
+  /**
+   * Checks if work directory cleanup is enabled in JobState.
+   */
+  private boolean isCleanupEnabled(JobState jobState) {
+    return jobState.getPropAsBoolean(
+        
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
+        
Boolean.parseBoolean(GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED));
+  }
+
+  /**
+   * Retrieves the set of paths to delete from JobState.
+   */
+  private Set<String> getPathsToDelete(JobState jobState) {
+    if 
(!jobState.contains(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE)) 
{
+      log.info("No work dir paths to delete configured in JobState");
+      return java.util.Collections.emptySet();
+    }
+    return 
jobState.getPropAsSet(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE);
+  }
+
+  /**
+   * Delete persisted {@link WorkUnit}s and {@link JobState} upon job 
completion.
+   */
+  protected void cleanupWorkingDirectory(FileSystem fs) throws IOException {
+    long startTimeMs = System.currentTimeMillis();
+    try {
+      cleanupStateStore(fs);
+      if 
(Boolean.parseBoolean(this.jobProps.getProperty(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
+          
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED)))
 {
+        Path workDirRootPath = 
JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
+        log.info("Cleaning up work directory : {} for job : {}", 
workDirRootPath, this.jobContext.getJobId());
+        fs.delete(workDirRootPath, true);
+      }
+    } catch (Exception e){
+      log.error("Failed to cleanup working directory for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
+    }
+    finally {
+      
emitLatencyMetric(GobblinOpenTelemetryMetrics.GOBBLIN_WORK_DIRECTORY_CLEANUP_LATENCY,
 startTimeMs, fs.getUri().getScheme());
+    }
+  }
+
+  private void cleanupStateStore(FileSystem fs) {
+    try {
+      log.info("Deleting persisted work units for job " + 
this.jobContext.getJobId());
+      stateStores.getWuStateStore().delete(this.jobContext.getJobId());
+
+      // delete the directory that stores the task state files
+      stateStores.getTaskStateStore().delete(outputTaskStateDir.getName());
+
+      log.info("Deleting job state file for job " + 
this.jobContext.getJobId());
+
+      if (this.stateStores.haveJobStateStore()) {
+        this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
+      } else {
+        Path jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false, 
this.appWorkDir, this.jobContext.getJobId());
+        fs.delete(jobStateFilePath, false);
+      }
+    } catch (Exception e){
+      log.error("Failed to cleanup state store for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
     }
   }
 }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
index 5c457e6945..3f1e6d0dc4 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -329,6 +329,64 @@ public class GenerateWorkUnitsImplTest {
     }
   }
 
+  @Test
+  public void testWorkDirPathsToDeleteIsStoredInJobState() throws IOException {
+    // Arrange
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+    JobState jobState = new JobState(jobProps);
+
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+      workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i);
+      workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i);
+      workUnits.add(workUnit);
+    }
+    WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+        .setFiniteStream(true)
+        .build();
+    Set<String> pathsToCleanUp = 
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+
+    // Act - simulate what GenerateWorkUnitsImpl does
+    if (!pathsToCleanUp.isEmpty()) {
+      
jobState.setProp(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
+          String.join(",", pathsToCleanUp));
+    }
+
+    // Assert
+    
Assert.assertTrue(jobState.contains(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE),
+        "JobState should contain WORK_DIR_PATHS_TO_DELETE property");
+    Set<String> retrievedPaths = jobState.getPropAsSet(
+        
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE);
+    Assert.assertEquals(retrievedPaths.size(), 6, "Should have 6 paths (3 
staging + 3 output)");
+    Assert.assertTrue(retrievedPaths.containsAll(pathsToCleanUp),
+        "Retrieved paths should match the original paths");
+  }
+
+  @Test
+  public void testWorkDirPathsToDeleteNotSetWhenEmpty() {
+    // Arrange
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+    JobState jobState = new JobState(jobProps);
+    Set<String> pathsToCleanUp = new java.util.HashSet<>();
+
+    // Act - simulate what GenerateWorkUnitsImpl does with empty paths
+    if (!pathsToCleanUp.isEmpty()) {
+      
jobState.setProp(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
+          String.join(",", pathsToCleanUp));
+    }
+
+    // Assert
+    
Assert.assertFalse(jobState.contains(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE),
+        "JobState should not contain WORK_DIR_PATHS_TO_DELETE when paths are 
empty");
+  }
+
   public static WorkUnit createWorkUnitOfSize(long size) {
     WorkUnit workUnit = WorkUnit.createEmpty();
     workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
index 9a6446d95c..b359a4ba2c 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.temporal.joblauncher;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -45,6 +46,7 @@ import io.temporal.serviceclient.WorkflowServiceStubs;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.locks.FileBasedJobLock;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
@@ -56,11 +58,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 
 public class GobblinTemporalJobLauncherTest {
 
-  private GobblinTemporalJobLauncher jobLauncher;
+  private GobblinTemporalJobLauncherForTest jobLauncher;
   private MockedStatic<TemporalWorkflowClientFactory> 
mockWorkflowClientFactory;
   private WorkflowServiceStubs mockServiceStubs;
   private WorkflowClient mockClient;
@@ -69,6 +74,8 @@ public class GobblinTemporalJobLauncherTest {
   private Properties jobProperties;
 
   class GobblinTemporalJobLauncherForTest extends GobblinTemporalJobLauncher {
+    int cleanupStagingDirectoryCallCount = 0;
+
     public GobblinTemporalJobLauncherForTest(Properties jobProperties, Path 
appWorkDir) throws Exception {
       super(jobProperties, appWorkDir, new ArrayList<>(), new 
ConcurrentHashMap<>(), null);
     }
@@ -78,6 +85,17 @@ public class GobblinTemporalJobLauncherTest {
         throws Exception {
       this.workflowId = "someWorkflowId";
     }
+
+    @Override
+    protected void cleanupStagingDirectory(JobState jobState) throws 
IOException {
+      cleanupStagingDirectoryCallCount++;
+      super.cleanupStagingDirectory(jobState);
+    }
+
+    // Expose jobContext for testing
+    public org.apache.gobblin.runtime.JobContext getJobContext() {
+      return this.jobContext;
+    }
   }
 
 
@@ -203,4 +221,107 @@ public class GobblinTemporalJobLauncherTest {
 
     verify(mockStub, times(1)).terminate("Job cancel invoked");
   }
+
+  @Test
+  public void testCleanupStagingDirectoryWithPathsToDelete() throws Exception {
+    // Create temp directories to simulate work directories
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    File outputDir = new File(tmpDir, "output");
+    stagingDir.mkdirs();
+    outputDir.mkdirs();
+
+    // Set up job state with WORK_DIR_PATHS_TO_DELETE
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    String pathsToDelete = stagingDir.getAbsolutePath() + "," + 
outputDir.getAbsolutePath();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
pathsToDelete);
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    // Verify directories exist before cleanup
+    assertTrue(stagingDir.exists(), "Staging directory should exist before 
cleanup");
+    assertTrue(outputDir.exists(), "Output directory should exist before 
cleanup");
+
+    // Execute cleanup
+    jobLauncher.cleanupStagingDirectory(jobState);
+
+    // Verify directories are deleted
+    assertFalse(stagingDir.exists(), "Staging directory should be deleted 
after cleanup");
+    assertFalse(outputDir.exists(), "Output directory should be deleted after 
cleanup");
+
+    // Cleanup test directory
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testCleanupStagingDirectoryWithoutPaths() throws Exception {
+    // Set up job state WITHOUT WORK_DIR_PATHS_TO_DELETE
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    // Should not throw exception when no paths configured
+    jobLauncher.cleanupStagingDirectory(jobState);
+  }
+
+  @Test
+  public void testCleanupStagingDirectoryWithCleanupDisabled() throws 
Exception {
+    // Create temp directories
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    stagingDir.mkdirs();
+
+    // Set up job state with cleanup disabled
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    String pathsToDelete = stagingDir.getAbsolutePath();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
pathsToDelete);
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "false");
+
+    // Execute cleanup
+    jobLauncher.cleanupStagingDirectory(jobState);
+
+    // Verify directory still exists (cleanup was disabled)
+    assertTrue(stagingDir.exists(), "Staging directory should still exist when 
cleanup is disabled");
+
+    // Cleanup
+    stagingDir.delete();
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testCloseTriggersCleanup() throws Exception {
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    stagingDir.mkdirs();
+
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
stagingDir.getAbsolutePath());
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    assertTrue(stagingDir.exists(), "Staging directory should exist before 
close()");
+
+    jobLauncher.close();
+
+    assertFalse(stagingDir.exists(), "close() should trigger cleanup and 
delete the staging directory");
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testCleanupRunsOnlyOnce() throws Exception {
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    stagingDir.mkdirs();
+
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
stagingDir.getAbsolutePath());
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    // First cleanup - triggered by close()
+    jobLauncher.close();
+    assertEquals(jobLauncher.cleanupStagingDirectoryCallCount, 1, "Cleanup 
should run exactly once after close()");
+
+    // Second trigger - simulates the shutdown hook firing after close() 
already ran
+    jobLauncher.triggerCleanupForTest();
+    assertEquals(jobLauncher.cleanupStagingDirectoryCallCount, 1, "Cleanup 
should not run a second time");
+
+    tmpDir.delete();
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 019410957c..5ef1b54c25 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -53,16 +53,20 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -422,6 +426,8 @@ public class GobblinYarnAppLauncher {
 
   /**
    * Stop this {@link GobblinYarnAppLauncher} instance.
+   * When not detaching on exit, sends a graceful shutdown signal to the AM 
container and polls until
+   * the application reaches a terminal state (or timeout) so the AM can clean 
up before the launcher exits.
    *
    * @throws IOException if this {@link GobblinYarnAppLauncher} instance fails 
to clean up its working directory.
    */
@@ -433,6 +439,11 @@ public class GobblinYarnAppLauncher {
     LOGGER.info("Stopping the " + 
GobblinYarnAppLauncher.class.getSimpleName());
 
     try {
+      // Only signal and wait when we are staying attached: if detachOnExit is 
enabled, we leave the app running.
+      if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
+        signalGracefulShutdownAndWaitForTerminal();
+      }
+
       if (this.serviceManager.isPresent()) {
         this.serviceManager.get().stopAsync().awaitStopped(5, 
TimeUnit.MINUTES);
       }
@@ -940,6 +951,83 @@ public class GobblinYarnAppLauncher {
     }
   }
 
+  /**
+   * Sends GRACEFUL_SHUTDOWN to the AM container via YarnClient, then polls 
for terminal state until
+   * configurable timeout. If the AM container is already gone (e.g. app 
already terminated), we skip
+   * signaling and return immediately.
+   */
+  @VisibleForTesting
+  void signalGracefulShutdownAndWaitForTerminal() {
+    int waitMinutes = ConfigUtils.getInt(this.config,
+        GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY,
+        
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES);
+    long timeoutMs = waitMinutes * 60L * 1000L;
+    try {
+      Optional<ContainerId> amContainerId = 
getAmContainerId(this.applicationId.get());
+      if (!amContainerId.isPresent()) {
+        LOGGER.warn("Could not resolve AM container for application {} (may 
already be terminated); skipping graceful shutdown wait", 
this.applicationId.get());
+        return;
+      }
+      sendGracefulShutdownSignal(amContainerId.get());
+      pollForApplicationCompletionUntil(timeoutMs);
+    } catch (YarnException | IOException e) {
+      LOGGER.warn("Could not signal AM for graceful shutdown; proceeding with 
stop", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted while waiting for graceful shutdown; proceeding 
with stop", e);
+    }
+  }
+
+  private Optional<ContainerId> getAmContainerId(ApplicationId appId) throws 
YarnException, IOException {
+    ApplicationReport appReport = this.yarnClient.getApplicationReport(appId);
+    ApplicationAttemptId attemptId = 
appReport.getCurrentApplicationAttemptId();
+    if (attemptId == null) {
+      return Optional.absent();
+    }
+    ApplicationAttemptReport attemptReport = 
this.yarnClient.getApplicationAttemptReport(attemptId);
+    ContainerId amContainerId = attemptReport.getAMContainerId();
+    return Optional.fromNullable(amContainerId);
+  }
+
+  private void sendGracefulShutdownSignal(ContainerId amContainerId) throws 
YarnException, IOException {
+    this.yarnClient.signalToContainer(amContainerId, 
SignalContainerCommand.GRACEFUL_SHUTDOWN);
+    LOGGER.info("Sent GRACEFUL_SHUTDOWN signal to AM container {}", 
amContainerId);
+  }
+
+  @VisibleForTesting
+  static boolean isApplicationCompleted(ApplicationReport report) {
+    YarnApplicationState state = report.getYarnApplicationState();
+    return state == YarnApplicationState.FINISHED
+        || state == YarnApplicationState.FAILED
+        || state == YarnApplicationState.KILLED;
+  }
+
+  private void pollForApplicationCompletionUntil(long timeoutMs) throws 
InterruptedException, YarnException, IOException {
+    int pollIntervalSec = ConfigUtils.getInt(this.config,
+        
GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY,
+        
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS);
+    long pollIntervalMs = pollIntervalSec * 1000L;
+    if (pollIntervalMs > timeoutMs) {
+      pollIntervalMs = timeoutMs;
+    }
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    ApplicationId appId = this.applicationId.get();
+    while (true) {
+      ApplicationReport report = this.yarnClient.getApplicationReport(appId);
+      if (isApplicationCompleted(report)) {
+        LOGGER.info("Application {} reached terminal state {}", appId, 
report.getYarnApplicationState());
+        return;
+      }
+      long nowMs = System.currentTimeMillis();
+      if (nowMs >= deadlineMs) {
+        LOGGER.info("Graceful shutdown wait timeout reached for application 
{}", appId);
+        return;
+      }
+      long remainingMs = deadlineMs - nowMs;
+      Thread.sleep(Math.min(pollIntervalMs, remainingMs));
+    }
+  }
+
   @VisibleForTesting
   void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException 
{
     // Create a new filesystem as this.fs may have been closed by the Yarn 
Application, and FS.get() will return a cached instance of the closed FS
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 00af259a22..167f211b20 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -181,4 +181,18 @@ public class GobblinYarnConfigurationKeys {
   //Config to control Heartbeat interval for Yarn AMRM client.
   public static final String AMRM_HEARTBEAT_INTERVAL_SECS = 
GOBBLIN_YARN_PREFIX + "amRmHeartbeatIntervalSecs";
   public static final Integer DEFAULT_AMRM_HEARTBEAT_INTERVAL_SECS = 15;
+
+  /**
+   * Max time (minutes) to wait for application to reach terminal state after 
sending graceful shutdown signal.
+   * Default 10. A longer value allows the AM more time to clean up but can 
delay launcher shutdown.
+   */
+  public static final String GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY = 
GOBBLIN_YARN_PREFIX + "graceful.shutdown.wait.time.minutes";
+  public static final int DEFAULT_GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES = 10;
+
+  /**
+   * Poll interval (seconds) when waiting for application completion after 
graceful shutdown signal.
+   * Default 5. Should be less than the total wait time (see {@link 
#GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}).
+   */
+  public static final String GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY = 
GOBBLIN_YARN_PREFIX + "graceful.shutdown.poll.interval.seconds";
+  public static final int DEFAULT_GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS = 60;
 }
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 622c6d5647..4a802877f8 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -39,8 +39,11 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -89,7 +92,10 @@ import 
org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.yarn.helix.HelixMessageSubTypes;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 
@@ -495,6 +501,78 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
     }
   }
 
+  /**
+   * Test that {@link 
GobblinYarnAppLauncher#isApplicationCompleted(ApplicationReport)} returns true 
for
+   * FINISHED, FAILED, KILLED and false for RUNNING and NEW.
+   */
+  @Test
+  public void testIsApplicationCompleted() throws Exception {
+    ApplicationReport reportFinished = Mockito.mock(ApplicationReport.class);
+    
when(reportFinished.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+    
Assert.assertTrue(GobblinYarnAppLauncher.isApplicationCompleted(reportFinished));
+
+    ApplicationReport reportFailed = Mockito.mock(ApplicationReport.class);
+    
when(reportFailed.getYarnApplicationState()).thenReturn(YarnApplicationState.FAILED);
+    
Assert.assertTrue(GobblinYarnAppLauncher.isApplicationCompleted(reportFailed));
+
+    ApplicationReport reportKilled = Mockito.mock(ApplicationReport.class);
+    
when(reportKilled.getYarnApplicationState()).thenReturn(YarnApplicationState.KILLED);
+    
Assert.assertTrue(GobblinYarnAppLauncher.isApplicationCompleted(reportKilled));
+
+    ApplicationReport reportRunning = Mockito.mock(ApplicationReport.class);
+    
when(reportRunning.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+    
Assert.assertFalse(GobblinYarnAppLauncher.isApplicationCompleted(reportRunning));
+
+    ApplicationReport reportNew = Mockito.mock(ApplicationReport.class);
+    
when(reportNew.getYarnApplicationState()).thenReturn(YarnApplicationState.NEW);
+    
Assert.assertFalse(GobblinYarnAppLauncher.isApplicationCompleted(reportNew));
+  }
+
+  /**
+   * Test that when stop() triggers graceful shutdown, the launcher sends 
GRACEFUL_SHUTDOWN to the AM
+   * container and polls until terminal state. Uses a mock YarnClient and 
reflection to invoke
+   * signalGracefulShutdownAndWaitForTerminal().
+   */
+  @Test(dependsOnMethods = "testCreateHelixCluster")
+  public void testGracefulShutdownSendsSignalAndPolls() throws Exception {
+    Config gracefulConfig = this.config
+        
.withValue(GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY,
 ConfigValueFactory.fromAnyRef(1))
+        
.withValue(GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY,
 ConfigValueFactory.fromAnyRef(1));
+    GobblinYarnAppLauncher launcher = new 
GobblinYarnAppLauncher(gracefulConfig, clusterConf);
+    launcher.initializeYarnClients(gracefulConfig);
+
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 
1);
+    ContainerId containerId = ContainerId.newInstance(attemptId, 0);
+
+    ApplicationReport appReport = Mockito.mock(ApplicationReport.class);
+    when(appReport.getApplicationId()).thenReturn(appId);
+    when(appReport.getCurrentApplicationAttemptId()).thenReturn(attemptId);
+    
when(appReport.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+
+    ApplicationAttemptReport attemptReport = 
Mockito.mock(ApplicationAttemptReport.class);
+    when(attemptReport.getAMContainerId()).thenReturn(containerId);
+
+    YarnClient mockYarnClient = Mockito.mock(YarnClient.class);
+    
when(mockYarnClient.getApplicationReport(any(ApplicationId.class))).thenReturn(appReport);
+    
when(mockYarnClient.getApplicationAttemptReport(any(ApplicationAttemptId.class))).thenReturn(attemptReport);
+
+    Field yarnClientField = 
GobblinYarnAppLauncher.class.getDeclaredField("yarnClient");
+    yarnClientField.setAccessible(true);
+    yarnClientField.set(launcher, mockYarnClient);
+
+    Field applicationIdField = 
GobblinYarnAppLauncher.class.getDeclaredField("applicationId");
+    applicationIdField.setAccessible(true);
+    applicationIdField.set(launcher, 
com.google.common.base.Optional.of(appId));
+
+    launcher.signalGracefulShutdownAndWaitForTerminal();
+
+    verify(mockYarnClient, times(1)).signalToContainer(eq(containerId), 
eq(SignalContainerCommand.GRACEFUL_SHUTDOWN));
+    // getApplicationReport: once in getAmContainerId(), once in 
pollForApplicationCompletionUntil() when checking terminal state
+    verify(mockYarnClient, times(2)).getApplicationReport(appId);
+    verify(mockYarnClient, times(1)).getApplicationAttemptReport(attemptId);
+  }
+
   @Test
   public void testAddMetricReportingDynamicConfig()
       throws IOException {


Reply via email to