Copilot commented on code in PR #4166:
URL: https://github.com/apache/gobblin/pull/4166#discussion_r2829133521


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performShutdownCleanup, "GobblinJobLauncher-Cleanup-" 
+ this.jobContext.getJobId()));
+  }
+
+  /**
+   * Performs cleanup operations during JVM shutdown.
+   * Creates a dedicated FileSystem instance, loads JobState, and executes 
cleanup tasks in parallel.
+   */
+  private void performShutdownCleanup() {
+    log.info("Shutdown hook: performing cleanup for job {}", 
this.jobContext.getJobId());
+
+    FileSystem shutdownFs = null;
     try {
-      cleanupWorkingDirectory();
+      shutdownFs = createNonCachedFileSystem();
+      JobState jobState = loadJobStateWithFallback(shutdownFs);
+      executeParallelCleanup(jobState);
+
+      log.info("Shutdown hook: cleanup completed for job {}", 
this.jobContext.getJobId());
+    } catch (Exception e) {
+      log.error("Shutdown hook: cleanup failed for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
     } finally {
-      super.close();
+      closeFileSystemQuietly(shutdownFs);
+    }

Review Comment:
   The method catches all exceptions and logs them, but it swallows the 
exception without attempting any kind of retry or fallback. If the filesystem 
creation fails early, the entire cleanup will be skipped silently. While this 
might be acceptable for a best-effort shutdown cleanup, consider at least 
attempting to use the in-memory JobState for cleanup even if filesystem 
creation fails, since cleanupStagingDirectory doesn't necessarily require 
loading JobState from disk if it's already in memory.



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -940,6 +951,83 @@ private AbstractTokenRefresher buildTokenRefreshManager() 
throws IOException {
     }
   }
 
+  /**
+   * Sends GRACEFUL_SHUTDOWN to the AM container via YarnClient, then polls 
for terminal state until
+   * configurable timeout. If the AM container is already gone (e.g. app 
already terminated), we skip
+   * signaling and return immediately.
+   */
+  @VisibleForTesting
+  void signalGracefulShutdownAndWaitForTerminal() {
+    int waitMinutes = ConfigUtils.getInt(this.config,
+        GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY,
+        
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES);
+    long timeoutMs = waitMinutes * 60L * 1000L;
+    try {
+      Optional<ContainerId> amContainerId = 
getAmContainerId(this.applicationId.get());
+      if (!amContainerId.isPresent()) {
+        LOGGER.warn("Could not resolve AM container for application {} (may 
already be terminated); skipping graceful shutdown wait", 
this.applicationId.get());
+        return;
+      }
+      sendGracefulShutdownSignal(amContainerId.get());
+      pollForApplicationCompletionUntil(timeoutMs);
+    } catch (YarnException | IOException e) {
+      LOGGER.warn("Could not signal AM for graceful shutdown; proceeding with 
stop", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted while waiting for graceful shutdown; proceeding 
with stop", e);
+    }
+  }
+
+  private Optional<ContainerId> getAmContainerId(ApplicationId appId) throws 
YarnException, IOException {
+    ApplicationReport appReport = this.yarnClient.getApplicationReport(appId);
+    ApplicationAttemptId attemptId = 
appReport.getCurrentApplicationAttemptId();
+    if (attemptId == null) {
+      return Optional.absent();
+    }
+    ApplicationAttemptReport attemptReport = 
this.yarnClient.getApplicationAttemptReport(attemptId);
+    ContainerId amContainerId = attemptReport.getAMContainerId();
+    return Optional.fromNullable(amContainerId);
+  }
+
+  private void sendGracefulShutdownSignal(ContainerId amContainerId) throws 
YarnException, IOException {
+    this.yarnClient.signalToContainer(amContainerId, 
SignalContainerCommand.GRACEFUL_SHUTDOWN);
+    LOGGER.info("Sent GRACEFUL_SHUTDOWN signal to AM container {}", 
amContainerId);
+  }
+
+  @VisibleForTesting
+  static boolean isApplicationCompleted(ApplicationReport report) {
+    YarnApplicationState state = report.getYarnApplicationState();
+    return state == YarnApplicationState.FINISHED
+        || state == YarnApplicationState.FAILED
+        || state == YarnApplicationState.KILLED;
+  }
+
+  private void pollForApplicationCompletionUntil(long timeoutMs) throws 
InterruptedException, YarnException, IOException {
+    int pollIntervalSec = ConfigUtils.getInt(this.config,
+        
GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY,
+        
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS);
+    long pollIntervalMs = pollIntervalSec * 1000L;
+    if (pollIntervalMs > timeoutMs) {
+      pollIntervalMs = timeoutMs;
+    }
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    ApplicationId appId = this.applicationId.get();
+    while (true) {
+      ApplicationReport report = this.yarnClient.getApplicationReport(appId);
+      if (isApplicationCompleted(report)) {
+        LOGGER.info("Application {} reached terminal state {}", appId, 
report.getYarnApplicationState());
+        return;
+      }
+      long nowMs = System.currentTimeMillis();
+      if (nowMs >= deadlineMs) {
+        LOGGER.info("Graceful shutdown wait timeout reached for application 
{}", appId);
+        return;
+      }
+      long remainingMs = deadlineMs - nowMs;
+      Thread.sleep(Math.min(pollIntervalMs, remainingMs));
+    }

Review Comment:
   The polling logic has an issue where if pollIntervalMs > timeoutMs, it sets 
pollIntervalMs = timeoutMs. Then if remainingMs < pollIntervalMs on the first 
iteration (which is common when timeoutMs is small), the thread will sleep for 
the remaining time, effectively making the entire timeout period a single sleep 
without any polling. This defeats the purpose of polling. Consider using a 
minimum poll interval or restructuring the logic to ensure at least one poll 
happens before timeout.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -265,6 +416,113 @@ void launchJobImpl(@Nullable JobListener jobListener) 
throws JobException {
     super.launchJob(jobListener);
   }
 
+  /**
+   * Delete writer staging and output directories using paths from JobState.
+   */
+  @VisibleForTesting
+  protected void cleanupStagingDirectory(JobState jobState) throws IOException 
{
+    if (!isCleanupEnabled(jobState)) {
+      return;
+    }
+
+    Set<String> pathsToDelete = getPathsToDelete(jobState);
+    if (pathsToDelete.isEmpty()) {
+      return;
+    }
+
+    log.info("Cleaning up {} work directories for job: {}", 
pathsToDelete.size(), this.jobContext.getJobId());
+
+    java.util.List<FileSystem> writerFileSystems = 
createWriterFileSystems(jobState);
+    try {
+      deletePathsUsingFileSystems(pathsToDelete, writerFileSystems);
+    } finally {
+      closeFileSystems(writerFileSystems);
+    }
+  }
+
+  /**
+   * Checks if work directory cleanup is enabled in JobState.
+   */
+  private boolean isCleanupEnabled(JobState jobState) {
+    return jobState.getPropAsBoolean(
+        
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
+        
Boolean.parseBoolean(GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED));
+  }
+
+  /**
+   * Retrieves the set of paths to delete from JobState.
+   */
+  private Set<String> getPathsToDelete(JobState jobState) {
+    if 
(!jobState.contains(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE)) 
{
+      log.info("No work dir paths to delete configured in JobState");
+      return java.util.Collections.emptySet();
+    }
+    return 
jobState.getPropAsSet(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE);
+  }
+
+  /**
+   * Creates FileSystems for each branch's writer destination with caching 
disabled.
+   */
+  private java.util.List<FileSystem> createWriterFileSystems(JobState 
jobState) throws IOException {
+    int numBranches = 
jobState.getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
+    java.util.List<FileSystem> writerFileSystems = new java.util.ArrayList<>();
+
+    for (int branchId = 0; branchId < numBranches; branchId++) {
+      String writerFsUri = jobState.getProp(
+          
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
 numBranches, branchId),
+          ConfigurationKeys.LOCAL_FS_URI);
+
+      FileSystem writerFs = createNonCachedFileSystem(URI.create(writerFsUri));
+      writerFileSystems.add(writerFs);
+    }
+
+    return writerFileSystems;
+  }
+
+  /**
+   * Deletes paths using the appropriate writer filesystem.
+   */
+  private void deletePathsUsingFileSystems(Set<String> pathsToDelete, 
java.util.List<FileSystem> writerFileSystems) {
+    for (String pathStr : pathsToDelete) {
+      try {
+        deleteSinglePath(new Path(pathStr), writerFileSystems);
+      } catch (Exception e) {
+        log.error("Failed to delete work directory {}: {}", pathStr, 
e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * Attempts to delete a single path by trying each filesystem until 
successful.
+   */
+  private void deleteSinglePath(Path path, java.util.List<FileSystem> 
writerFileSystems) {
+    for (FileSystem writerFs : writerFileSystems) {
+      try {
+        if (writerFs.exists(path)) {
+          log.info("Deleting work directory: {}", path);
+          writerFs.delete(path, true);
+          return;
+        }
+      } catch (Exception e) {
+        // Try next filesystem
+      }
+    }
+    log.info("Work directory does not exist or not accessible: {}", path);
+  }

Review Comment:
   The method silently swallows exceptions when trying different filesystems 
(line 506-508). While this allows trying multiple filesystems, it makes 
debugging difficult when all filesystems fail to delete a path. The final log 
message "does not exist or not accessible" doesn't distinguish between a path 
that truly doesn't exist and a path that failed to delete due to permissions or 
other errors. Consider logging the exceptions at debug level or collecting them 
to provide more detailed error information.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performShutdownCleanup, "GobblinJobLauncher-Cleanup-" 
+ this.jobContext.getJobId()));
+  }
+
+  /**
+   * Performs cleanup operations during JVM shutdown.
+   * Creates a dedicated FileSystem instance, loads JobState, and executes 
cleanup tasks in parallel.
+   */
+  private void performShutdownCleanup() {
+    log.info("Shutdown hook: performing cleanup for job {}", 
this.jobContext.getJobId());
+
+    FileSystem shutdownFs = null;
     try {
-      cleanupWorkingDirectory();
+      shutdownFs = createNonCachedFileSystem();
+      JobState jobState = loadJobStateWithFallback(shutdownFs);
+      executeParallelCleanup(jobState);
+
+      log.info("Shutdown hook: cleanup completed for job {}", 
this.jobContext.getJobId());
+    } catch (Exception e) {
+      log.error("Shutdown hook: cleanup failed for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
     } finally {
-      super.close();
+      closeFileSystemQuietly(shutdownFs);
+    }
+  }
+
+  /**
+   * Creates a new FileSystem instance with caching disabled to avoid issues
+   * with shared filesystem instances being closed before shutdown hook 
executes.
+   */
+  private FileSystem createNonCachedFileSystem() throws IOException {
+    URI fsUri = 
URI.create(this.jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
+    return createNonCachedFileSystem(fsUri);
+  }
+
+  /**
+   * Creates a FileSystem instance for the given URI with caching disabled.
+   */
+  private FileSystem createNonCachedFileSystem(URI fsUri) throws IOException {
+    Configuration conf = createNonCachedConfiguration();
+    return FileSystem.get(fsUri, conf);
+  }
+
+  /**
+   * Creates a Hadoop Configuration with FileSystem caching disabled.
+   */
+  private Configuration createNonCachedConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set("fs.hdfs.impl.disable.cache", "true");
+    return conf;
+  }
+
+  /**
+   * Loads JobState from persisted file with fallback to in-memory state.
+   * @param fs FileSystem to use for loading JobState
+   * @return JobState loaded from file, or in-memory JobState if file loading 
fails
+   */
+  private JobState loadJobStateWithFallback(FileSystem fs) {
+    JobState jobState = loadJobStateFromFile(fs);
+    if (jobState == null) {
+      log.warn("Shutdown hook: could not load JobState from file, trying using 
in-memory state");
+      jobState = this.jobContext.getJobState();
     }
+    return jobState;
+  }
+
+  /**
+   * Executes staging and working directory cleanup tasks in parallel.
+   * @param jobState JobState containing cleanup configuration and paths
+   * @throws Exception if cleanup tasks fail or timeout
+   */
+  private void executeParallelCleanup(JobState jobState) throws Exception {
+    java.util.concurrent.CompletableFuture<Void> stagingCleanup = 
createStagingCleanupTask(jobState);
+    java.util.concurrent.CompletableFuture<Void> workDirCleanup = 
createWorkDirCleanupTask();
+
+    java.util.concurrent.CompletableFuture.allOf(stagingCleanup, 
workDirCleanup)
+        .get(this.shutdownCleanupTimeoutSeconds, 
java.util.concurrent.TimeUnit.SECONDS);
+  }
+
+  /**
+   * Creates an asynchronous task for staging directory cleanup.
+   */
+  private java.util.concurrent.CompletableFuture<Void> 
createStagingCleanupTask(JobState jobState) {
+    return java.util.concurrent.CompletableFuture.runAsync(() -> {
+      try {
+        cleanupStagingDirectory(jobState);
+      } catch (Exception e) {
+        log.warn("Shutdown hook: staging cleanup failed: {}", e.getMessage(), 
e);
+      }
+    });

Review Comment:
   The CompletableFuture tasks use the common ForkJoinPool by default (via 
runAsync without explicit executor). During JVM shutdown, the ForkJoinPool may 
be shutting down or unavailable, which could cause the tasks to not execute. 
Consider using a dedicated executor that is guaranteed to be available during 
shutdown, or executing the cleanup tasks sequentially in the shutdown hook 
thread to avoid depending on the ForkJoinPool.
   ```suggestion
      * Creates a task for staging directory cleanup.
      * This implementation runs synchronously in the calling thread to avoid
      * depending on the common ForkJoinPool during JVM shutdown.
      */
     private java.util.concurrent.CompletableFuture<Void> 
createStagingCleanupTask(JobState jobState) {
       return java.util.concurrent.CompletableFuture
           .<Void>completedFuture(null)
           .thenRun(() -> {
             try {
               cleanupStagingDirectory(jobState);
             } catch (Exception e) {
               log.warn("Shutdown hook: staging cleanup failed: {}", 
e.getMessage(), e);
             }
           });
   ```



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java:
##########
@@ -203,4 +211,68 @@ public void testTerminateWorkflow() throws Exception {
 
     verify(mockStub, times(1)).terminate("Job cancel invoked");
   }
+
+  @Test
+  public void testCleanupStagingDirectoryWithPathsToDelete() throws Exception {
+    // Create temp directories to simulate work directories
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    File outputDir = new File(tmpDir, "output");
+    stagingDir.mkdirs();
+    outputDir.mkdirs();
+
+    // Set up job state with WORK_DIR_PATHS_TO_DELETE
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    String pathsToDelete = stagingDir.getAbsolutePath() + "," + 
outputDir.getAbsolutePath();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
pathsToDelete);
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    // Verify directories exist before cleanup
+    assertTrue(stagingDir.exists(), "Staging directory should exist before 
cleanup");
+    assertTrue(outputDir.exists(), "Output directory should exist before 
cleanup");
+
+    // Execute cleanup
+    jobLauncher.cleanupStagingDirectory(jobState);
+
+    // Verify directories are deleted
+    assertFalse(stagingDir.exists(), "Staging directory should be deleted 
after cleanup");
+    assertFalse(outputDir.exists(), "Output directory should be deleted after 
cleanup");
+
+    // Cleanup test directory
+    tmpDir.delete();
+  }
+
+  @Test
+  public void testCleanupStagingDirectoryWithoutPaths() throws Exception {
+    // Set up job state WITHOUT WORK_DIR_PATHS_TO_DELETE
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    // Should not throw exception when no paths configured
+    jobLauncher.cleanupStagingDirectory(jobState);
+  }
+
+  @Test
+  public void testCleanupStagingDirectoryWithCleanupDisabled() throws 
Exception {
+    // Create temp directories
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    stagingDir.mkdirs();
+
+    // Set up job state with cleanup disabled
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    String pathsToDelete = stagingDir.getAbsolutePath();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
pathsToDelete);
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "false");
+
+    // Execute cleanup
+    jobLauncher.cleanupStagingDirectory(jobState);
+
+    // Verify directory still exists (cleanup was disabled)
+    assertTrue(stagingDir.exists(), "Staging directory should still exist when 
cleanup is disabled");
+
+    // Cleanup
+    stagingDir.delete();
+    tmpDir.delete();
+  }

Review Comment:
   The test cleanup at the end only deletes tmpDir without recursively deleting 
its contents. If the cleanup was disabled, stagingDir still exists and 
tmpDir.delete() will fail silently. Use recursive deletion or 
FileUtils.deleteDirectory() to ensure proper cleanup.



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java:
##########
@@ -78,6 +81,11 @@ protected void submitJob(List<WorkUnit> workUnits)
         throws Exception {
       this.workflowId = "someWorkflowId";
     }
+
+    // Expose jobContext for testing
+    public org.apache.gobblin.runtime.JobContext getJobContext() {
+      return this.jobContext;
+    }
   }

Review Comment:
   The GobblinTemporalJobLauncherForTest exposes getJobContext() for testing, 
which is good. However, since each test now creates a launcher instance that 
registers a shutdown hook in the parent class constructor, these shutdown hooks 
will accumulate and all execute when the JVM terminates. This could cause 
issues in test execution. Consider adding a cleanup mechanism to remove the 
shutdown hook after each test, or implementing a way to disable shutdown hook 
registration during testing.



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -940,6 +951,83 @@ private AbstractTokenRefresher buildTokenRefreshManager() 
throws IOException {
     }
   }
 
+  /**
+   * Sends GRACEFUL_SHUTDOWN to the AM container via YarnClient, then polls 
for terminal state until
+   * configurable timeout. If the AM container is already gone (e.g. app 
already terminated), we skip
+   * signaling and return immediately.
+   */
+  @VisibleForTesting
+  void signalGracefulShutdownAndWaitForTerminal() {
+    int waitMinutes = ConfigUtils.getInt(this.config,
+        GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY,
+        
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES);
+    long timeoutMs = waitMinutes * 60L * 1000L;
+    try {
+      Optional<ContainerId> amContainerId = 
getAmContainerId(this.applicationId.get());
+      if (!amContainerId.isPresent()) {
+        LOGGER.warn("Could not resolve AM container for application {} (may 
already be terminated); skipping graceful shutdown wait", 
this.applicationId.get());
+        return;
+      }
+      sendGracefulShutdownSignal(amContainerId.get());
+      pollForApplicationCompletionUntil(timeoutMs);
+    } catch (YarnException | IOException e) {
+      LOGGER.warn("Could not signal AM for graceful shutdown; proceeding with 
stop", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted while waiting for graceful shutdown; proceeding 
with stop", e);
+    }

Review Comment:
   When InterruptedException is caught, the thread's interrupt status is 
restored, but the method logs a warning and proceeds with stop. This is 
correct, but the calling code in the stop method doesn't check for 
interruption. If cleanup is interrupted, the launcher will continue with normal 
shutdown, which may not be the desired behavior. Consider documenting this 
behavior or propagating the interruption to the caller.



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java:
##########
@@ -203,4 +211,68 @@ public void testTerminateWorkflow() throws Exception {
 
     verify(mockStub, times(1)).terminate("Job cancel invoked");
   }
+
+  @Test
+  public void testCleanupStagingDirectoryWithPathsToDelete() throws Exception {
+    // Create temp directories to simulate work directories
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    File outputDir = new File(tmpDir, "output");
+    stagingDir.mkdirs();
+    outputDir.mkdirs();
+
+    // Set up job state with WORK_DIR_PATHS_TO_DELETE
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    String pathsToDelete = stagingDir.getAbsolutePath() + "," + 
outputDir.getAbsolutePath();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
pathsToDelete);
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    // Verify directories exist before cleanup
+    assertTrue(stagingDir.exists(), "Staging directory should exist before 
cleanup");
+    assertTrue(outputDir.exists(), "Output directory should exist before 
cleanup");
+
+    // Execute cleanup
+    jobLauncher.cleanupStagingDirectory(jobState);
+
+    // Verify directories are deleted
+    assertFalse(stagingDir.exists(), "Staging directory should be deleted 
after cleanup");
+    assertFalse(outputDir.exists(), "Output directory should be deleted after 
cleanup");
+
+    // Cleanup test directory
+    tmpDir.delete();
+  }

Review Comment:
   The test creates temporary directories but doesn't ensure they are cleaned 
up in all scenarios. If assertions fail before the cleanup code at the end, the 
directories will remain. Use a try-finally block or TestNG's @AfterMethod to 
ensure cleanup always happens. Additionally, for 
testCleanupStagingDirectoryWithPathsToDelete, line 242 only deletes tmpDir 
itself, which will fail if it's not empty - this should use recursive deletion.



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java:
##########
@@ -181,4 +181,18 @@ public class GobblinYarnConfigurationKeys {
   //Config to control Heartbeat interval for Yarn AMRM client.
   public static final String AMRM_HEARTBEAT_INTERVAL_SECS = 
GOBBLIN_YARN_PREFIX + "amRmHeartbeatIntervalSecs";
   public static final Integer DEFAULT_AMRM_HEARTBEAT_INTERVAL_SECS = 15;
+
+  /**
+   * Max time (minutes) to wait for application to reach terminal state after 
sending graceful shutdown signal.
+   * Default 10. A longer value allows the AM more time to clean up but can 
delay launcher shutdown.
+   */
+  public static final String GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY = 
GOBBLIN_YARN_PREFIX + "graceful.shutdown.wait.time.minutes";
+  public static final int DEFAULT_GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES = 10;
+
+  /**
+   * Poll interval (seconds) when waiting for application completion after 
graceful shutdown signal.
+   * Default 5. Should be less than the total wait time (see {@link 
#GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}).

Review Comment:
   The documentation comment states "Default 5" but the actual default value is 
60. This inconsistency could confuse users. The documentation should be updated 
to match the implementation.
   ```suggestion
      * Default 60. Should be less than the total wait time (see {@link 
#GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}).
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performShutdownCleanup, "GobblinJobLauncher-Cleanup-" 
+ this.jobContext.getJobId()));
+  }
+
+  /**
+   * Performs cleanup operations during JVM shutdown.
+   * Creates a dedicated FileSystem instance, loads JobState, and executes 
cleanup tasks in parallel.
+   */
+  private void performShutdownCleanup() {
+    log.info("Shutdown hook: performing cleanup for job {}", 
this.jobContext.getJobId());
+
+    FileSystem shutdownFs = null;
     try {
-      cleanupWorkingDirectory();
+      shutdownFs = createNonCachedFileSystem();
+      JobState jobState = loadJobStateWithFallback(shutdownFs);
+      executeParallelCleanup(jobState);
+
+      log.info("Shutdown hook: cleanup completed for job {}", 
this.jobContext.getJobId());
+    } catch (Exception e) {
+      log.error("Shutdown hook: cleanup failed for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
     } finally {
-      super.close();
+      closeFileSystemQuietly(shutdownFs);
+    }
+  }
+
+  /**
+   * Creates a new FileSystem instance with caching disabled to avoid issues
+   * with shared filesystem instances being closed before shutdown hook 
executes.
+   */
+  private FileSystem createNonCachedFileSystem() throws IOException {
+    URI fsUri = 
URI.create(this.jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
+    return createNonCachedFileSystem(fsUri);
+  }
+
+  /**
+   * Creates a FileSystem instance for the given URI with caching disabled.
+   */
+  private FileSystem createNonCachedFileSystem(URI fsUri) throws IOException {
+    Configuration conf = createNonCachedConfiguration();
+    return FileSystem.get(fsUri, conf);
+  }
+
+  /**
+   * Creates a Hadoop Configuration with FileSystem caching disabled.
+   */
+  private Configuration createNonCachedConfiguration() {
+    Configuration conf = new Configuration();

Review Comment:
   The method creates a new Configuration() without initializing it from the 
existing configuration that the launcher was started with. This means important 
Hadoop configuration properties (like authentication, kerberos settings, custom 
filesystem implementations, etc.) may not be available when creating the 
FileSystem. Consider using the existing configuration or copying properties 
from this.jobProps to ensure the FileSystem is created with the correct 
settings.
   ```suggestion
      * Creates a Hadoop Configuration with FileSystem caching disabled.
      * The configuration is initialized from this.jobProps so that any
      * Hadoop-related settings used by the main job execution are honored
      * during shutdown cleanup as well.
      */
     private Configuration createNonCachedConfiguration() {
       Configuration conf = new Configuration();
   
       // Initialize configuration from job properties to preserve Hadoop 
settings
       if (this.jobProps != null) {
         for (String key : this.jobProps.stringPropertyNames()) {
           String value = this.jobProps.getProperty(key);
           if (value != null) {
             conf.set(key, value);
           }
         }
       }
   
       // Disable FileSystem caching to avoid issues in shutdown hook
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performShutdownCleanup, "GobblinJobLauncher-Cleanup-" 
+ this.jobContext.getJobId()));
+  }

Review Comment:
   The shutdown hook thread is registered but no reference is kept to it. This 
means it cannot be removed later if needed, preventing proper cleanup when the 
job completes successfully or when the launcher is closed. This will cause the 
shutdown hook to remain registered even after the job finishes, potentially 
causing issues in long-running JVMs or test scenarios. Consider storing the 
thread reference as a private field so it can be removed via 
Runtime.getRuntime().removeShutdownHook() in the close() method or after 
successful job completion.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -183,7 +335,6 @@ protected void runWorkUnits(List<WorkUnit> workUnits) 
throws Exception {
 
       // The last iteration of output TaskState collecting will run when the 
collector service gets stopped
       this.taskStateCollectorService.stopAsync().awaitTerminated();

Review Comment:
   The removal of cleanupWorkingDirectory() from the normal execution flow 
means that if the JVM does not shut down abnormally (e.g., normal completion or 
System.exit() is never called), cleanup will only happen through the shutdown 
hook. However, the shutdown hook only runs when the JVM is terminating, not 
when the job completes normally within a long-running JVM process. This could 
lead to accumulation of work directories in environments where the same JVM 
runs multiple jobs sequentially. Consider keeping the normal cleanup call here 
and making the shutdown hook cleanup idempotent.
   ```suggestion
         this.taskStateCollectorService.stopAsync().awaitTerminated();
   
         // Ensure working directory is cleaned up after each job run, even in 
long-lived JVMs
         try {
           cleanupWorkingDirectory();
         } catch (IOException e) {
           log.warn("Failed to clean up working directory for job {}", 
this.jobContext.getJobId(), e);
         }
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));

Review Comment:
   The cleanup timeout is configured in seconds but stored in a long variable 
without validation. If a user accidentally provides an extremely large value or 
negative value, it could cause unexpected behavior. Consider adding validation 
to ensure the timeout is within reasonable bounds (e.g., > 0 and < some maximum 
value like 3600 seconds).



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performShutdownCleanup, "GobblinJobLauncher-Cleanup-" 
+ this.jobContext.getJobId()));
+  }
+
+  /**
+   * Performs cleanup operations during JVM shutdown.
+   * Creates a dedicated FileSystem instance, loads JobState, and executes 
cleanup tasks in parallel.
+   */
+  private void performShutdownCleanup() {
+    log.info("Shutdown hook: performing cleanup for job {}", 
this.jobContext.getJobId());
+
+    FileSystem shutdownFs = null;
     try {
-      cleanupWorkingDirectory();
+      shutdownFs = createNonCachedFileSystem();
+      JobState jobState = loadJobStateWithFallback(shutdownFs);
+      executeParallelCleanup(jobState);
+
+      log.info("Shutdown hook: cleanup completed for job {}", 
this.jobContext.getJobId());
+    } catch (Exception e) {
+      log.error("Shutdown hook: cleanup failed for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
     } finally {
-      super.close();
+      closeFileSystemQuietly(shutdownFs);
+    }
+  }

Review Comment:
   The shutdown hook creates non-cached FileSystem instances to avoid closure 
race conditions, but the launcher already uses a FileSystem instance (this.fs). 
During shutdown, both the shutdown hook and potentially other cleanup code 
paths could be operating on the same underlying file paths concurrently, 
leading to race conditions. While using non-cached FileSystem instances helps, 
there's still a risk of interference if the normal cleanup flow is executing 
when shutdown is triggered. Consider adding synchronization or a flag to ensure 
cleanup only happens once.



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java:
##########
@@ -203,4 +211,68 @@ public void testTerminateWorkflow() throws Exception {
 
     verify(mockStub, times(1)).terminate("Job cancel invoked");
   }
+
+  @Test
+  public void testCleanupStagingDirectoryWithPathsToDelete() throws Exception {
+    // Create temp directories to simulate work directories
+    File tmpDir = Files.createTempDir();
+    File stagingDir = new File(tmpDir, "staging");
+    File outputDir = new File(tmpDir, "output");
+    stagingDir.mkdirs();
+    outputDir.mkdirs();
+
+    // Set up job state with WORK_DIR_PATHS_TO_DELETE
+    JobState jobState = jobLauncher.getJobContext().getJobState();
+    String pathsToDelete = stagingDir.getAbsolutePath() + "," + 
outputDir.getAbsolutePath();
+    
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE, 
pathsToDelete);
+    
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
 "true");
+
+    // Verify directories exist before cleanup
+    assertTrue(stagingDir.exists(), "Staging directory should exist before 
cleanup");
+    assertTrue(outputDir.exists(), "Output directory should exist before 
cleanup");
+
+    // Execute cleanup
+    jobLauncher.cleanupStagingDirectory(jobState);
+
+    // Verify directories are deleted
+    assertFalse(stagingDir.exists(), "Staging directory should be deleted 
after cleanup");
+    assertFalse(outputDir.exists(), "Output directory should be deleted after 
cleanup");
+
+    // Cleanup test directory
+    tmpDir.delete();
+  }

Review Comment:
   The test creates temporary directories but does not clean them up properly 
in all cases. If the test fails or an assertion throws an exception, stagingDir 
and outputDir may not be deleted. Consider using try-finally blocks or TestNG's 
@AfterMethod to ensure cleanup, or better yet, use a temporary directory 
framework that automatically cleans up.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performShutdownCleanup, "GobblinJobLauncher-Cleanup-" 
+ this.jobContext.getJobId()));

Review Comment:
   The shutdown hook is registered in the constructor and will execute on every 
JVM shutdown, but there is no mechanism to remove the shutdown hook when the 
job completes successfully. This means the shutdown hook will run even when 
cleanup has already been performed in the normal flow, potentially causing 
redundant cleanup attempts or errors when trying to delete already-deleted 
directories. Consider storing a reference to the shutdown hook thread and 
removing it in the close method or after successful job completion to prevent 
duplicate cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to