This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 09a3e21add [GOBBLIN-2191] Shutdown GobblinJobLauncher executor (#4094)
09a3e21add is described below

commit 09a3e21add10f3a42a3c89c531db9e7bc84d5f2e
Author: abhishekmjain <[email protected]>
AuthorDate: Wed Jan 29 22:59:40 2025 +0530

    [GOBBLIN-2191] Shutdown GobblinJobLauncher executor (#4094)
    
    Shutdown GobblinJobLauncher executor to fix AM graceful shutdown
---
 .../apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java    | 7 ++++++-
 .../main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java  | 6 ++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index 13ecfd1ebc..c358b368a3 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
@@ -58,6 +59,7 @@ import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
@@ -128,7 +130,8 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
     this.taskStateCollectorService =
         new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), 
this.eventBus, this.eventSubmitter,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
-    this.executor = Executors.newSingleThreadExecutor();
+    this.executor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("GobblinJobLauncher")));
   }
 
   @Override
@@ -180,6 +183,8 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
       waitJob(submitJobFuture);
       log.info(String.format("Job %s completed", this.jobContext.getJobId()));
     } finally {
+      ExecutorsUtils.shutdownExecutorService(executor, Optional.of(log));
+
       // The last iteration of output TaskState collecting will run when the 
collector service gets stopped
       this.taskStateCollectorService.stopAsync().awaitTerminated();
       cleanupWorkingDirectory();
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 031e295aa0..d2f2180fff 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -643,6 +643,12 @@ public class GobblinYarnAppLauncher {
     LOGGER.info("Application Tracking URL: " + 
applicationReport.getTrackingUrl());
     LOGGER.info("Application User: " + applicationReport.getUser() + " Queue: 
" + applicationReport.getQueue());
 
+    // Temporal workflow tracking url
+    String temporalWorkflowTrackingUrl = ConfigUtils.getString(config, 
"gobblin.temporal.ui.server.url", "");
+    if (StringUtils.isNotBlank(temporalWorkflowTrackingUrl)) {
+      LOGGER.info("Temporal Workflow Tracking URL: " + 
temporalWorkflowTrackingUrl);
+    }
+
     return applicationId;
   }
 

Reply via email to