This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e584e5b49d [GOBBLIN-2182] Cleanup workdir in Gobblin-on-Temporal
execution (#4085)
e584e5b49d is described below
commit e584e5b49d24aaed398d082cf1e07bac8c13f453
Author: Vivek Rai <[email protected]>
AuthorDate: Mon Jan 20 09:52:45 2025 +0530
[GOBBLIN-2182] Cleanup workdir in Gobblin-on-Temporal execution (#4085)
* added cleanup for got controlled with a config
---
.../temporal/GobblinTemporalConfigurationKeys.java | 2 ++
.../gobblin/temporal/joblauncher/GobblinJobLauncher.java | 15 ++++++++++++++-
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index e90e901a56..0201cda0fb 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -43,6 +43,8 @@ public interface GobblinTemporalConfigurationKeys {
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides";
+ String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX +
"work.dir.cleanup.enabled";
+ String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true";
/**
* Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing
collisions with prod jobs
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index ea2c2ce7c2..13ecfd1ebc 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -59,6 +59,8 @@ import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job using
the Temporal task framework.
@@ -134,7 +136,11 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
try {
executeCancellation();
} finally {
- super.close();
+ try {
+ cleanupWorkingDirectory();
+ } finally {
+ super.close();
+ }
}
}
@@ -277,6 +283,13 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir,
this.jobContext.getJobId());
this.fs.delete(jobStateFilePath, false);
}
+
+ if
(Boolean.parseBoolean(this.jobProps.getProperty(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
+
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED)))
{
+ Path workDirRootPath =
JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
+ log.info("Cleaning up work directory : {} for job : {}",
workDirRootPath, this.jobContext.getJobId());
+ this.fs.delete(workDirRootPath, true);
+ }
}
}