abhishekmjain commented on code in PR #4166:
URL: https://github.com/apache/gobblin/pull/4166#discussion_r2851336660


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -132,15 +135,164 @@ public GobblinJobLauncher(Properties jobProps, Path 
appWorkDir,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
     this.executor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
+
+    this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+        GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+    registerCleanupShutdownHook();
   }
 
-  @Override
-  public void close() throws IOException {
+  /**
+   * Registers a JVM shutdown hook that loads JobState from file and runs 
cleanup methods in parallel.
+   */
+  private void registerCleanupShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(this::performShutdownCleanup, "GobblinJobLauncher-Cleanup-" 
+ this.jobContext.getJobId()));
+  }
+
+  /**
+   * Performs cleanup operations during JVM shutdown.
+   * Creates a dedicated FileSystem instance, loads JobState, and executes 
cleanup tasks in parallel.
+   */
+  private void performShutdownCleanup() {
+    log.info("Shutdown hook: performing cleanup for job {}", 
this.jobContext.getJobId());
+
+    FileSystem shutdownFs = null;
     try {
-      cleanupWorkingDirectory();
+      shutdownFs = createNonCachedFileSystem();
+      JobState jobState = loadJobStateWithFallback(shutdownFs);
+      executeParallelCleanup(jobState, shutdownFs);
+
+      log.info("Shutdown hook: cleanup completed for job {}", 
this.jobContext.getJobId());
+    } catch (Exception e) {
+      log.error("Shutdown hook: cleanup failed for job {}: {}", 
this.jobContext.getJobId(), e.getMessage(), e);
     } finally {
-      super.close();
+      closeFileSystemQuietly(shutdownFs);
+    }
+  }
+
+  /**
+   * Creates a new FileSystem instance with caching disabled to avoid issues
+   * with shared filesystem instances being closed before shutdown hook 
executes.
+   */
+  private FileSystem createNonCachedFileSystem() throws IOException {

Review Comment:
   Let's use FileSystem.newInstance if our intention is to get a new file 
system object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to