This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 09a3e21add [GOBBLIN-2191] Shutdown GobblinJobLauncher executor (#4094)
09a3e21add is described below
commit 09a3e21add10f3a42a3c89c531db9e7bc84d5f2e
Author: abhishekmjain <[email protected]>
AuthorDate: Wed Jan 29 22:59:40 2025 +0530
[GOBBLIN-2191] Shutdown GobblinJobLauncher executor (#4094)
Shutdown GobblinJobLauncher executor to fix AM graceful shutdown
---
.../apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java | 7 ++++++-
.../main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | 6 ++++++
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index 13ecfd1ebc..c358b368a3 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
@@ -58,6 +59,7 @@ import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
@@ -128,7 +130,8 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
this.taskStateCollectorService =
new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
this.eventBus, this.eventSubmitter,
this.stateStores.getTaskStateStore(), this.outputTaskStateDir,
this.getIssueRepository());
- this.executor = Executors.newSingleThreadExecutor();
+ this.executor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("GobblinJobLauncher")));
}
@Override
@@ -180,6 +183,8 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
waitJob(submitJobFuture);
log.info(String.format("Job %s completed", this.jobContext.getJobId()));
} finally {
+ ExecutorsUtils.shutdownExecutorService(executor, Optional.of(log));
+
// The last iteration of output TaskState collecting will run when the
collector service gets stopped
this.taskStateCollectorService.stopAsync().awaitTerminated();
cleanupWorkingDirectory();
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 031e295aa0..d2f2180fff 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -643,6 +643,12 @@ public class GobblinYarnAppLauncher {
LOGGER.info("Application Tracking URL: " +
applicationReport.getTrackingUrl());
LOGGER.info("Application User: " + applicationReport.getUser() + " Queue:
" + applicationReport.getQueue());
+ // Temporal workflow tracking url
+ String temporalWorkflowTrackingUrl = ConfigUtils.getString(config,
"gobblin.temporal.ui.server.url", "");
+ if (StringUtils.isNotBlank(temporalWorkflowTrackingUrl)) {
+ LOGGER.info("Temporal Workflow Tracking URL: " +
temporalWorkflowTrackingUrl);
+ }
+
return applicationId;
}