This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5e128c909b [GOBBLIN-2247] Add cleanup logic for Staging directories in
temporal flow (#4166)
5e128c909b is described below
commit 5e128c909bfc095f07b58077d12f9e4f83b229cf
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Feb 27 11:00:26 2026 +0530
[GOBBLIN-2247] Add cleanup logic for Staging directories in temporal flow
(#4166)
Implements graceful shutdown support across Gobblin Yarn and Temporal by:
Yarn Launcher: Sends GRACEFUL_SHUTDOWN signal to AM container and waits for
terminal state
Temporal Job Launcher: Executes shutdown hook to clean up staging and
working directories
This ensures proper cleanup during job termination while allowing the
Application Master time to finish cleanup tasks.
---
.../opentelemetry/GobblinOpenTelemetryMetrics.java | 14 +-
.../GobblinOpenTelemetryMetricsConstants.java | 1 +
.../temporal/GobblinTemporalConfigurationKeys.java | 3 +
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 6 +
.../temporal/joblauncher/GobblinJobLauncher.java | 281 +++++++++++++++++++--
.../activity/impl/GenerateWorkUnitsImplTest.java | 58 +++++
.../GobblinTemporalJobLauncherTest.java | 123 ++++++++-
.../gobblin/yarn/GobblinYarnAppLauncher.java | 88 +++++++
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 14 +
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 78 ++++++
10 files changed, 641 insertions(+), 25 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
index c54f213e24..1ff8334d8d 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetrics.java
@@ -36,7 +36,19 @@ public enum GobblinOpenTelemetryMetrics {
* Metric to track the latency of each Gobblin Job state (GenerateWorkUnit,
ProcessWorkUnit, CommitStep).
* Metric Unit: seconds (s) represents the time taken for each state.
* */
- GOBBLIN_JOB_STATE_LATENCY("gobblin.job.state.latency", "Gobblin job state
latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
+ GOBBLIN_JOB_STATE_LATENCY("gobblin.job.state.latency", "Gobblin job state
latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM),
+
+ /**
+ * Metric to track the time taken to delete staging directories during
cleanup, broken down by filesystem scheme.
+ * Metric Unit: seconds (s).
+ * */
+ GOBBLIN_STAGING_CLEANUP_LATENCY("gobblin.staging.cleanup.latency", "Gobblin
staging directory cleanup latency", "s",
OpenTelemetryMetricType.DOUBLE_HISTOGRAM),
+
+ /**
+ * Metric to track the time taken to delete working directories (work units,
task states, job state) during cleanup, broken down by filesystem scheme.
+ * Metric Unit: seconds (s).
+ * */
+
GOBBLIN_WORK_DIRECTORY_CLEANUP_LATENCY("gobblin.work.directory.cleanup.latency",
"Gobblin working directory cleanup latency", "s",
OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
private final String metricName;
private final String metricDescription;
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
index d998380a54..7c1ff493e1 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/opentelemetry/GobblinOpenTelemetryMetricsConstants.java
@@ -22,6 +22,7 @@ public class GobblinOpenTelemetryMetricsConstants {
public static class DimensionKeys {
public static final String STATE = "state";
public static final String CURR_STATE = "currState";
+ public static final String FS_SCHEME = "fsScheme";
}
public static class DimensionValues {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index bc87fe1f55..982b4af7f3 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -52,6 +52,9 @@ public interface GobblinTemporalConfigurationKeys {
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides";
String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX +
"work.dir.cleanup.enabled";
String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true";
+ String WORK_DIR_PATHS_TO_DELETE = PREFIX + "work.dir.paths.to.delete";
+ String SHUTDOWN_CLEANUP_TIMEOUT_SECONDS = PREFIX +
"shutdown.cleanup.timeout.seconds";
+ String DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS = "600";
String GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX = PREFIX +
"container.metrics.";
String GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME =
GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "application.name";
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 9b35921158..6f14384add 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -76,6 +76,7 @@ import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -196,6 +197,11 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
jobState.setProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY,
Initializer.AfterInitializeMemento.serialize(memento))
);
+ // Store the work dir paths to delete in JobState for reuse during
cleanup
+ if (!genWUsInsights.getPathsToCleanUp().isEmpty()) {
+
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
+ String.join(",", genWUsInsights.getPathsToCleanUp()));
+ }
JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION:
the writing of `JobState` after all WUs signifies WU gen+serialization now
complete
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index ef0e9de0bc..1bf04dfe39 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -19,14 +19,19 @@ package org.apache.gobblin.temporal.joblauncher;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -61,6 +66,11 @@ import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
+import
org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetricsConstants;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryDoubleHistogram;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryHelper;
+import org.apache.gobblin.metrics.opentelemetry.OpenTelemetryInstrumentation;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
@@ -96,6 +106,8 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
protected JobListener jobListener;
protected volatile boolean jobSubmitted = false;
private final ExecutorService executor;
+ private final long shutdownCleanupTimeoutSeconds;
+ private final AtomicBoolean cleanupExecuted = new AtomicBoolean(false);
public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap, EventBus eventbus)
@@ -132,17 +144,151 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
this.stateStores.getTaskStateStore(), this.outputTaskStateDir,
this.getIssueRepository());
this.executor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("GobblinJobLauncher")));
+
+ this.shutdownCleanupTimeoutSeconds = Long.parseLong(jobProps.getProperty(
+ GobblinTemporalConfigurationKeys.SHUTDOWN_CLEANUP_TIMEOUT_SECONDS,
+
GobblinTemporalConfigurationKeys.DEFAULT_SHUTDOWN_CLEANUP_TIMEOUT_SECONDS));
+
+ registerCleanupShutdownHook();
}
- @Override
- public void close() throws IOException {
+ /**
+ * Registers a JVM shutdown hook that loads JobState from file and runs
cleanup methods in parallel.
+ */
+ private void registerCleanupShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(
+ new Thread(this::performCleanup, "GobblinJobLauncher-Cleanup-" +
this.jobContext.getJobId()));
+ }
+
+ /**
+ * Performs cleanup operations during JVM shutdown.
+ * Creates a dedicated FileSystem instance, loads JobState, and executes
cleanup tasks in parallel.
+ */
+ private void performCleanup() {
+ if (!cleanupExecuted.compareAndSet(false, true)) {
+ log.info("Cleanup already executed for job {}, skipping",
this.jobContext.getJobId());
+ return;
+ }
+ log.info("Performing cleanup for job {}", this.jobContext.getJobId());
+
+ String fsUriStr = this.jobProps.getProperty(ConfigurationKeys.FS_URI_KEY,
ConfigurationKeys.LOCAL_FS_URI);
+ FileSystem shutdownFs = null;
try {
- cleanupWorkingDirectory();
+ shutdownFs = FileSystem.newInstance(URI.create(fsUriStr), new
Configuration());
+ JobState jobState = loadJobStateWithFallback(shutdownFs);
+ executeParallelCleanup(jobState, shutdownFs);
+
+ log.info("Shutdown hook: cleanup completed for job {}",
this.jobContext.getJobId());
+ } catch (Exception e) {
+ log.error("Shutdown hook: cleanup failed for job {}: {}",
this.jobContext.getJobId(), e.getMessage(), e);
} finally {
- super.close();
+ closeFileSystemQuietly(shutdownFs);
+ }
+ }
+
+ /**
+ * Loads JobState from persisted file with fallback to in-memory state.
+ * @param fs FileSystem to use for loading JobState
+ * @return JobState loaded from file, or in-memory JobState if file loading
fails
+ */
+ private JobState loadJobStateWithFallback(FileSystem fs) {
+ JobState jobState = loadJobStateFromFile(fs);
+ if (jobState == null) {
+ log.warn("Shutdown hook: could not load JobState from file, trying using
in-memory state");
+ jobState = this.jobContext.getJobState();
+ }
+ return jobState;
+ }
+
+ /**
+ * Executes staging and working directory cleanup tasks in parallel.
+ * @param jobState JobState containing cleanup configuration and paths
+ * @throws Exception if cleanup tasks fail or timeout
+ */
+ private void executeParallelCleanup(JobState jobState, FileSystem fs) throws
Exception {
+ java.util.concurrent.CompletableFuture<Void> stagingCleanup =
createStagingCleanupTask(jobState);
+ java.util.concurrent.CompletableFuture<Void> workDirCleanup =
createWorkDirCleanupTask(fs);
+
+ java.util.concurrent.CompletableFuture.allOf(stagingCleanup,
workDirCleanup)
+ .get(this.shutdownCleanupTimeoutSeconds,
java.util.concurrent.TimeUnit.SECONDS);
+ }
+
+ /**
+ * Creates an asynchronous task for staging directory cleanup.
+ */
+ private java.util.concurrent.CompletableFuture<Void>
createStagingCleanupTask(JobState jobState) {
+ return java.util.concurrent.CompletableFuture.runAsync(() -> {
+ try {
+ cleanupStagingDirectory(jobState);
+ } catch (Exception e) {
+ log.warn("Shutdown hook: staging cleanup failed: {}", e.getMessage(),
e);
+ }
+ });
+ }
+
+ /**
+ * Creates an asynchronous task for working directory cleanup.
+ */
+ private java.util.concurrent.CompletableFuture<Void>
createWorkDirCleanupTask(FileSystem fs) {
+ return java.util.concurrent.CompletableFuture.runAsync(() -> {
+ try {
+ cleanupWorkingDirectory(fs);
+ } catch (Exception e) {
+ log.warn("Shutdown hook: working directory cleanup failed: {}",
e.getMessage(), e);
+ }
+ });
+ }
+
+ /**
+ * Closes the FileSystem quietly, logging any errors
+ */
+ private void closeFileSystemQuietly(FileSystem fs) {
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (Exception e) {
+ log.error("Failed to close shutdown FileSystem: {}", e.getMessage());
+ }
}
}
+ /**
+ * Loads JobState from the persisted file written by GenerateWorkUnits.
+ * Returns null if JobState cannot be loaded.
+ */
+ private JobState loadJobStateFromFile(FileSystem fs) {
+ try {
+ Path workDirRoot =
JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
+ Path jobStateFile = new Path(workDirRoot, "job.state");
+
+ if (!fs.exists(jobStateFile)) {
+ log.error("JobState file does not exist at {}", jobStateFile);
+ return null;
+ }
+
+ JobState jobState = new JobState();
+ try (java.io.DataInputStream dis = new
java.io.DataInputStream(fs.open(jobStateFile))) {
+ jobState.readFields(dis);
+ log.info("Successfully loaded JobState from {}", jobStateFile);
+ return jobState;
+ }
+ } catch (Exception e) {
+ log.error("Failed to load JobState from file: ", e);
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ performCleanup();
+ super.close();
+ }
+
+ @VisibleForTesting
+ void triggerCleanupForTest() {
+ performCleanup();
+ }
+
public String getJobId() {
return this.jobContext.getJobId();
}
@@ -183,7 +329,6 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
// The last iteration of output TaskState collecting will run when the
collector service gets stopped
this.taskStateCollectorService.stopAsync().awaitTerminated();
- cleanupWorkingDirectory();
}
}
@@ -266,30 +411,120 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
}
/**
- * Delete persisted {@link WorkUnit}s and {@link JobState} upon job
completion.
+ * Delete writer staging and output directories using paths from JobState.
*/
- protected void cleanupWorkingDirectory() throws IOException {
- log.info("Deleting persisted work units for job " +
this.jobContext.getJobId());
- stateStores.getWuStateStore().delete(this.jobContext.getJobId());
+ @VisibleForTesting
+ protected void cleanupStagingDirectory(JobState jobState) throws IOException
{
+ if (!isCleanupEnabled(jobState)) {
+ return;
+ }
+
+ Set<String> pathsToDelete = getPathsToDelete(jobState);
+ if (pathsToDelete.isEmpty()) {
+ return;
+ }
+
+ log.info("Cleaning up {} work directories for job: {}",
pathsToDelete.size(), this.jobContext.getJobId());
- // delete the directory that stores the task state files
- stateStores.getTaskStateStore().delete(outputTaskStateDir.getName());
+ String writerFsUri =
jobState.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
+ String fsScheme = URI.create(writerFsUri).getScheme();
+ FileSystem writerFs = FileSystem.newInstance(URI.create(writerFsUri),
WriterUtils.getFsConfiguration(jobState));
+ long startTimeMs = System.currentTimeMillis();
+ try {
+ for (String pathStr : pathsToDelete) {
+ Path path = new Path(pathStr);
+ try {
+ if (writerFs.exists(path)) {
+ log.info("Deleting work directory: {}", path);
+ writerFs.delete(path, true);
+ } else {
+ log.info("Work directory does not exist or not accessible: {}",
path);
+ }
+ } catch (Exception e) {
+ log.error("Failed to delete work directory {}: {}", pathStr,
e.getMessage(), e);
+ }
+ }
+ } finally {
+ closeFileSystemQuietly(writerFs);
+ emitStagingCleanupLatency(startTimeMs, fsScheme);
+ }
+ }
- log.info("Deleting job state file for job " + this.jobContext.getJobId());
+ private void emitStagingCleanupLatency(long startTimeMs, String fsScheme) {
+
emitLatencyMetric(GobblinOpenTelemetryMetrics.GOBBLIN_STAGING_CLEANUP_LATENCY,
startTimeMs, fsScheme);
+ }
- if (this.stateStores.haveJobStateStore()) {
- this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
- } else {
- Path jobStateFilePath =
- GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir,
this.jobContext.getJobId());
- this.fs.delete(jobStateFilePath, false);
+ private void emitLatencyMetric(GobblinOpenTelemetryMetrics metric, long
startTimeMs, String fsScheme) {
+ try {
+ double durationSeconds = (System.currentTimeMillis() - startTimeMs) /
1000.0;
+ Map<String, String> attributes =
Collections.singletonMap(GobblinOpenTelemetryMetricsConstants.DimensionKeys.FS_SCHEME,
fsScheme);
+ OpenTelemetryDoubleHistogram histogram =
OpenTelemetryInstrumentation.getInstance(this.jobProps)
+ .getOrCreate(metric);
+ histogram.record(durationSeconds,
OpenTelemetryHelper.toOpenTelemetryAttributes(attributes));
+ } catch (Exception e) {
+ log.warn("Failed to emit metric {}: {}", metric.getMetricName(),
e.getMessage());
}
+ }
- if
(Boolean.parseBoolean(this.jobProps.getProperty(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
-
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED)))
{
- Path workDirRootPath =
JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
- log.info("Cleaning up work directory : {} for job : {}",
workDirRootPath, this.jobContext.getJobId());
- this.fs.delete(workDirRootPath, true);
+ /**
+ * Checks if work directory cleanup is enabled in JobState.
+ */
+ private boolean isCleanupEnabled(JobState jobState) {
+ return jobState.getPropAsBoolean(
+
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
+
Boolean.parseBoolean(GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED));
+ }
+
+ /**
+ * Retrieves the set of paths to delete from JobState.
+ */
+ private Set<String> getPathsToDelete(JobState jobState) {
+ if
(!jobState.contains(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE))
{
+ log.info("No work dir paths to delete configured in JobState");
+ return java.util.Collections.emptySet();
+ }
+ return
jobState.getPropAsSet(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE);
+ }
+
+ /**
+ * Delete persisted {@link WorkUnit}s and {@link JobState} upon job
completion.
+ */
+ protected void cleanupWorkingDirectory(FileSystem fs) throws IOException {
+ long startTimeMs = System.currentTimeMillis();
+ try {
+ cleanupStateStore(fs);
+ if
(Boolean.parseBoolean(this.jobProps.getProperty(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
+
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED)))
{
+ Path workDirRootPath =
JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
+ log.info("Cleaning up work directory : {} for job : {}",
workDirRootPath, this.jobContext.getJobId());
+ fs.delete(workDirRootPath, true);
+ }
+ } catch (Exception e){
+ log.error("Failed to cleanup working directory for job {}: {}",
this.jobContext.getJobId(), e.getMessage(), e);
+ }
+ finally {
+
emitLatencyMetric(GobblinOpenTelemetryMetrics.GOBBLIN_WORK_DIRECTORY_CLEANUP_LATENCY,
startTimeMs, fs.getUri().getScheme());
+ }
+ }
+
+ private void cleanupStateStore(FileSystem fs) {
+ try {
+ log.info("Deleting persisted work units for job " +
this.jobContext.getJobId());
+ stateStores.getWuStateStore().delete(this.jobContext.getJobId());
+
+ // delete the directory that stores the task state files
+ stateStores.getTaskStateStore().delete(outputTaskStateDir.getName());
+
+ log.info("Deleting job state file for job " +
this.jobContext.getJobId());
+
+ if (this.stateStores.haveJobStateStore()) {
+ this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
+ } else {
+ Path jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false,
this.appWorkDir, this.jobContext.getJobId());
+ fs.delete(jobStateFilePath, false);
+ }
+ } catch (Exception e){
+ log.error("Failed to cleanup state store for job {}: {}",
this.jobContext.getJobId(), e.getMessage(), e);
}
}
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
index 5c457e6945..3f1e6d0dc4 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -329,6 +329,64 @@ public class GenerateWorkUnitsImplTest {
}
}
+ @Test
+ public void testWorkDirPathsToDeleteIsStoredInJobState() throws IOException {
+ // Arrange
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+ JobState jobState = new JobState(jobProps);
+
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i);
+ workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i);
+ workUnits.add(workUnit);
+ }
+ WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+ .setFiniteStream(true)
+ .build();
+ Set<String> pathsToCleanUp =
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+
+ // Act - simulate what GenerateWorkUnitsImpl does
+ if (!pathsToCleanUp.isEmpty()) {
+
jobState.setProp(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
+ String.join(",", pathsToCleanUp));
+ }
+
+ // Assert
+
Assert.assertTrue(jobState.contains(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE),
+ "JobState should contain WORK_DIR_PATHS_TO_DELETE property");
+ Set<String> retrievedPaths = jobState.getPropAsSet(
+
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE);
+ Assert.assertEquals(retrievedPaths.size(), 6, "Should have 6 paths (3
staging + 3 output)");
+ Assert.assertTrue(retrievedPaths.containsAll(pathsToCleanUp),
+ "Retrieved paths should match the original paths");
+ }
+
+ @Test
+ public void testWorkDirPathsToDeleteNotSetWhenEmpty() {
+ // Arrange
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+ JobState jobState = new JobState(jobProps);
+ Set<String> pathsToCleanUp = new java.util.HashSet<>();
+
+ // Act - simulate what GenerateWorkUnitsImpl does with empty paths
+ if (!pathsToCleanUp.isEmpty()) {
+
jobState.setProp(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
+ String.join(",", pathsToCleanUp));
+ }
+
+ // Assert
+
Assert.assertFalse(jobState.contains(org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE),
+ "JobState should not contain WORK_DIR_PATHS_TO_DELETE when paths are
empty");
+ }
+
public static WorkUnit createWorkUnitOfSize(long size) {
WorkUnit workUnit = WorkUnit.createEmpty();
workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
index 9a6446d95c..b359a4ba2c 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.joblauncher;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -45,6 +46,7 @@ import io.temporal.serviceclient.WorkflowServiceStubs;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.locks.FileBasedJobLock;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
@@ -56,11 +58,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
public class GobblinTemporalJobLauncherTest {
- private GobblinTemporalJobLauncher jobLauncher;
+ private GobblinTemporalJobLauncherForTest jobLauncher;
private MockedStatic<TemporalWorkflowClientFactory>
mockWorkflowClientFactory;
private WorkflowServiceStubs mockServiceStubs;
private WorkflowClient mockClient;
@@ -69,6 +74,8 @@ public class GobblinTemporalJobLauncherTest {
private Properties jobProperties;
class GobblinTemporalJobLauncherForTest extends GobblinTemporalJobLauncher {
+ int cleanupStagingDirectoryCallCount = 0;
+
public GobblinTemporalJobLauncherForTest(Properties jobProperties, Path
appWorkDir) throws Exception {
super(jobProperties, appWorkDir, new ArrayList<>(), new
ConcurrentHashMap<>(), null);
}
@@ -78,6 +85,17 @@ public class GobblinTemporalJobLauncherTest {
throws Exception {
this.workflowId = "someWorkflowId";
}
+
+ @Override
+ protected void cleanupStagingDirectory(JobState jobState) throws
IOException {
+ cleanupStagingDirectoryCallCount++;
+ super.cleanupStagingDirectory(jobState);
+ }
+
+ // Expose jobContext for testing
+ public org.apache.gobblin.runtime.JobContext getJobContext() {
+ return this.jobContext;
+ }
}
@@ -203,4 +221,107 @@ public class GobblinTemporalJobLauncherTest {
verify(mockStub, times(1)).terminate("Job cancel invoked");
}
+
+ @Test
+ public void testCleanupStagingDirectoryWithPathsToDelete() throws Exception {
+ // Create temp directories to simulate work directories
+ File tmpDir = Files.createTempDir();
+ File stagingDir = new File(tmpDir, "staging");
+ File outputDir = new File(tmpDir, "output");
+ stagingDir.mkdirs();
+ outputDir.mkdirs();
+
+ // Set up job state with WORK_DIR_PATHS_TO_DELETE
+ JobState jobState = jobLauncher.getJobContext().getJobState();
+ String pathsToDelete = stagingDir.getAbsolutePath() + "," +
outputDir.getAbsolutePath();
+
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
pathsToDelete);
+
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
"true");
+
+ // Verify directories exist before cleanup
+ assertTrue(stagingDir.exists(), "Staging directory should exist before
cleanup");
+ assertTrue(outputDir.exists(), "Output directory should exist before
cleanup");
+
+ // Execute cleanup
+ jobLauncher.cleanupStagingDirectory(jobState);
+
+ // Verify directories are deleted
+ assertFalse(stagingDir.exists(), "Staging directory should be deleted
after cleanup");
+ assertFalse(outputDir.exists(), "Output directory should be deleted after
cleanup");
+
+ // Cleanup test directory
+ tmpDir.delete();
+ }
+
+ @Test
+ public void testCleanupStagingDirectoryWithoutPaths() throws Exception {
+ // Set up job state WITHOUT WORK_DIR_PATHS_TO_DELETE
+ JobState jobState = jobLauncher.getJobContext().getJobState();
+
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
"true");
+
+ // Should not throw exception when no paths configured
+ jobLauncher.cleanupStagingDirectory(jobState);
+ }
+
+ @Test
+ public void testCleanupStagingDirectoryWithCleanupDisabled() throws
Exception {
+ // Create temp directories
+ File tmpDir = Files.createTempDir();
+ File stagingDir = new File(tmpDir, "staging");
+ stagingDir.mkdirs();
+
+ // Set up job state with cleanup disabled
+ JobState jobState = jobLauncher.getJobContext().getJobState();
+ String pathsToDelete = stagingDir.getAbsolutePath();
+
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
pathsToDelete);
+
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
"false");
+
+ // Execute cleanup
+ jobLauncher.cleanupStagingDirectory(jobState);
+
+ // Verify directory still exists (cleanup was disabled)
+ assertTrue(stagingDir.exists(), "Staging directory should still exist when
cleanup is disabled");
+
+ // Cleanup
+ stagingDir.delete();
+ tmpDir.delete();
+ }
+
+ @Test
+ public void testCloseTriggersCleanup() throws Exception {
+ File tmpDir = Files.createTempDir();
+ File stagingDir = new File(tmpDir, "staging");
+ stagingDir.mkdirs();
+
+ JobState jobState = jobLauncher.getJobContext().getJobState();
+
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
stagingDir.getAbsolutePath());
+
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
"true");
+
+ assertTrue(stagingDir.exists(), "Staging directory should exist before
close()");
+
+ jobLauncher.close();
+
+ assertFalse(stagingDir.exists(), "close() should trigger cleanup and
delete the staging directory");
+ tmpDir.delete();
+ }
+
+ @Test
+ public void testCleanupRunsOnlyOnce() throws Exception {
+ File tmpDir = Files.createTempDir();
+ File stagingDir = new File(tmpDir, "staging");
+ stagingDir.mkdirs();
+
+ JobState jobState = jobLauncher.getJobContext().getJobState();
+
jobState.setProp(GobblinTemporalConfigurationKeys.WORK_DIR_PATHS_TO_DELETE,
stagingDir.getAbsolutePath());
+
jobState.setProp(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
"true");
+
+ // First cleanup - triggered by close()
+ jobLauncher.close();
+ assertEquals(jobLauncher.cleanupStagingDirectoryCallCount, 1, "Cleanup
should run exactly once after close()");
+
+ // Second trigger - simulates the shutdown hook firing after close()
already ran
+ jobLauncher.triggerCleanupForTest();
+ assertEquals(jobLauncher.cleanupStagingDirectoryCallCount, 1, "Cleanup
should not run a second time");
+
+ tmpDir.delete();
+ }
}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 019410957c..5ef1b54c25 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -53,16 +53,20 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -422,6 +426,8 @@ public class GobblinYarnAppLauncher {
/**
* Stop this {@link GobblinYarnAppLauncher} instance.
+ * When not detaching on exit, sends a graceful shutdown signal to the AM
container and polls until
+ * the application reaches a terminal state (or timeout) so the AM can clean
up before the launcher exits.
*
* @throws IOException if this {@link GobblinYarnAppLauncher} instance fails
to clean up its working directory.
*/
@@ -433,6 +439,11 @@ public class GobblinYarnAppLauncher {
LOGGER.info("Stopping the " +
GobblinYarnAppLauncher.class.getSimpleName());
try {
+ // Only signal and wait when we are staying attached: if detachOnExit is
enabled, we leave the app running.
+ if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
+ signalGracefulShutdownAndWaitForTerminal();
+ }
+
if (this.serviceManager.isPresent()) {
this.serviceManager.get().stopAsync().awaitStopped(5,
TimeUnit.MINUTES);
}
@@ -940,6 +951,83 @@ public class GobblinYarnAppLauncher {
}
}
+ /**
+ * Sends GRACEFUL_SHUTDOWN to the AM container via YarnClient, then polls
for terminal state until
+ * configurable timeout. If the AM container is already gone (e.g. app
already terminated), we skip
+ * signaling and return immediately.
+ */
+ @VisibleForTesting
+ void signalGracefulShutdownAndWaitForTerminal() {
+ int waitMinutes = ConfigUtils.getInt(this.config,
+ GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY,
+
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES);
+ long timeoutMs = waitMinutes * 60L * 1000L;
+ try {
+ Optional<ContainerId> amContainerId =
getAmContainerId(this.applicationId.get());
+ if (!amContainerId.isPresent()) {
+ LOGGER.warn("Could not resolve AM container for application {} (may
already be terminated); skipping graceful shutdown wait",
this.applicationId.get());
+ return;
+ }
+ sendGracefulShutdownSignal(amContainerId.get());
+ pollForApplicationCompletionUntil(timeoutMs);
+ } catch (YarnException | IOException e) {
+ LOGGER.warn("Could not signal AM for graceful shutdown; proceeding with
stop", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while waiting for graceful shutdown; proceeding
with stop", e);
+ }
+ }
+
+ private Optional<ContainerId> getAmContainerId(ApplicationId appId) throws
YarnException, IOException {
+ ApplicationReport appReport = this.yarnClient.getApplicationReport(appId);
+ ApplicationAttemptId attemptId =
appReport.getCurrentApplicationAttemptId();
+ if (attemptId == null) {
+ return Optional.absent();
+ }
+ ApplicationAttemptReport attemptReport =
this.yarnClient.getApplicationAttemptReport(attemptId);
+ ContainerId amContainerId = attemptReport.getAMContainerId();
+ return Optional.fromNullable(amContainerId);
+ }
+
+ private void sendGracefulShutdownSignal(ContainerId amContainerId) throws
YarnException, IOException {
+ this.yarnClient.signalToContainer(amContainerId,
SignalContainerCommand.GRACEFUL_SHUTDOWN);
+ LOGGER.info("Sent GRACEFUL_SHUTDOWN signal to AM container {}",
amContainerId);
+ }
+
+ @VisibleForTesting
+ static boolean isApplicationCompleted(ApplicationReport report) {
+ YarnApplicationState state = report.getYarnApplicationState();
+ return state == YarnApplicationState.FINISHED
+ || state == YarnApplicationState.FAILED
+ || state == YarnApplicationState.KILLED;
+ }
+
+ private void pollForApplicationCompletionUntil(long timeoutMs) throws
InterruptedException, YarnException, IOException {
+ int pollIntervalSec = ConfigUtils.getInt(this.config,
+
GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY,
+
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS);
+ long pollIntervalMs = pollIntervalSec * 1000L;
+ if (pollIntervalMs > timeoutMs) {
+ pollIntervalMs = timeoutMs;
+ }
+ long deadlineMs = System.currentTimeMillis() + timeoutMs;
+ ApplicationId appId = this.applicationId.get();
+ while (true) {
+ ApplicationReport report = this.yarnClient.getApplicationReport(appId);
+ if (isApplicationCompleted(report)) {
+ LOGGER.info("Application {} reached terminal state {}", appId,
report.getYarnApplicationState());
+ return;
+ }
+ long nowMs = System.currentTimeMillis();
+ if (nowMs >= deadlineMs) {
+ LOGGER.info("Graceful shutdown wait timeout reached for application
{}", appId);
+ return;
+ }
+ long remainingMs = deadlineMs - nowMs;
+ Thread.sleep(Math.min(pollIntervalMs, remainingMs));
+ }
+ }
+
@VisibleForTesting
void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException
{
// Create a new filesystem as this.fs may have been closed by the Yarn
Application, and FS.get() will return a cached instance of the closed FS
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 00af259a22..167f211b20 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -181,4 +181,18 @@ public class GobblinYarnConfigurationKeys {
//Config to control Heartbeat interval for Yarn AMRM client.
public static final String AMRM_HEARTBEAT_INTERVAL_SECS =
GOBBLIN_YARN_PREFIX + "amRmHeartbeatIntervalSecs";
public static final Integer DEFAULT_AMRM_HEARTBEAT_INTERVAL_SECS = 15;
+
+ /**
+ * Max time (minutes) to wait for application to reach terminal state after
sending graceful shutdown signal.
+ * Default 10. A longer value allows the AM more time to clean up but can
delay launcher shutdown.
+ */
+ public static final String GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY =
GOBBLIN_YARN_PREFIX + "graceful.shutdown.wait.time.minutes";
+ public static final int DEFAULT_GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES = 10;
+
+ /**
+ * Poll interval (seconds) when waiting for application completion after
graceful shutdown signal.
+ * Default 5. Should be less than the total wait time (see {@link
#GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}).
+ */
+ public static final String GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY =
GOBBLIN_YARN_PREFIX + "graceful.shutdown.poll.interval.seconds";
+ public static final int DEFAULT_GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS = 60;
}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 622c6d5647..4a802877f8 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -39,8 +39,11 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -89,7 +92,10 @@ import
org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.yarn.helix.HelixMessageSubTypes;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -495,6 +501,78 @@ public class GobblinYarnAppLauncherTest implements
HelixMessageTestBase {
}
}
+ /**
+ * Test that {@link
GobblinYarnAppLauncher#isApplicationCompleted(ApplicationReport)} returns true
for
+ * FINISHED, FAILED, KILLED and false for RUNNING and NEW.
+ */
+ @Test
+ public void testIsApplicationCompleted() throws Exception {
+ ApplicationReport reportFinished = Mockito.mock(ApplicationReport.class);
+
when(reportFinished.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+
Assert.assertTrue(GobblinYarnAppLauncher.isApplicationCompleted(reportFinished));
+
+ ApplicationReport reportFailed = Mockito.mock(ApplicationReport.class);
+
when(reportFailed.getYarnApplicationState()).thenReturn(YarnApplicationState.FAILED);
+
Assert.assertTrue(GobblinYarnAppLauncher.isApplicationCompleted(reportFailed));
+
+ ApplicationReport reportKilled = Mockito.mock(ApplicationReport.class);
+
when(reportKilled.getYarnApplicationState()).thenReturn(YarnApplicationState.KILLED);
+
Assert.assertTrue(GobblinYarnAppLauncher.isApplicationCompleted(reportKilled));
+
+ ApplicationReport reportRunning = Mockito.mock(ApplicationReport.class);
+
when(reportRunning.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+
Assert.assertFalse(GobblinYarnAppLauncher.isApplicationCompleted(reportRunning));
+
+ ApplicationReport reportNew = Mockito.mock(ApplicationReport.class);
+
when(reportNew.getYarnApplicationState()).thenReturn(YarnApplicationState.NEW);
+
Assert.assertFalse(GobblinYarnAppLauncher.isApplicationCompleted(reportNew));
+ }
+
+ /**
+ * Test that when stop() triggers graceful shutdown, the launcher sends
GRACEFUL_SHUTDOWN to the AM
+ * container and polls until terminal state. Uses a mock YarnClient and
reflection to invoke
+ * signalGracefulShutdownAndWaitForTerminal().
+ */
+ @Test(dependsOnMethods = "testCreateHelixCluster")
+ public void testGracefulShutdownSendsSignalAndPolls() throws Exception {
+ Config gracefulConfig = this.config
+
.withValue(GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY,
ConfigValueFactory.fromAnyRef(1))
+
.withValue(GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY,
ConfigValueFactory.fromAnyRef(1));
+ GobblinYarnAppLauncher launcher = new
GobblinYarnAppLauncher(gracefulConfig, clusterConf);
+ launcher.initializeYarnClients(gracefulConfig);
+
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
1);
+ ContainerId containerId = ContainerId.newInstance(attemptId, 0);
+
+ ApplicationReport appReport = Mockito.mock(ApplicationReport.class);
+ when(appReport.getApplicationId()).thenReturn(appId);
+ when(appReport.getCurrentApplicationAttemptId()).thenReturn(attemptId);
+
when(appReport.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+
+ ApplicationAttemptReport attemptReport =
Mockito.mock(ApplicationAttemptReport.class);
+ when(attemptReport.getAMContainerId()).thenReturn(containerId);
+
+ YarnClient mockYarnClient = Mockito.mock(YarnClient.class);
+
when(mockYarnClient.getApplicationReport(any(ApplicationId.class))).thenReturn(appReport);
+
when(mockYarnClient.getApplicationAttemptReport(any(ApplicationAttemptId.class))).thenReturn(attemptReport);
+
+ Field yarnClientField =
GobblinYarnAppLauncher.class.getDeclaredField("yarnClient");
+ yarnClientField.setAccessible(true);
+ yarnClientField.set(launcher, mockYarnClient);
+
+ Field applicationIdField =
GobblinYarnAppLauncher.class.getDeclaredField("applicationId");
+ applicationIdField.setAccessible(true);
+ applicationIdField.set(launcher,
com.google.common.base.Optional.of(appId));
+
+ launcher.signalGracefulShutdownAndWaitForTerminal();
+
+ verify(mockYarnClient, times(1)).signalToContainer(eq(containerId),
eq(SignalContainerCommand.GRACEFUL_SHUTDOWN));
+ // getApplicationReport: once in getAmContainerId(), once in
pollForApplicationCompletionUntil() when checking terminal state
+ verify(mockYarnClient, times(2)).getApplicationReport(appId);
+ verify(mockYarnClient, times(1)).getApplicationAttemptReport(attemptId);
+ }
+
@Test
public void testAddMetricReportingDynamicConfig()
throws IOException {