Repository: incubator-gobblin Updated Branches: refs/heads/master af141db59 -> fbf7c9bbd
[GOBBLIN-363] Clean up the job-level subdir in the _taskstate directory in Gobblin Cluster after a job is done Closes #2234 from htran1/cluster_task_state_cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fbf7c9bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fbf7c9bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fbf7c9bb Branch: refs/heads/master Commit: fbf7c9bbd23ef310f3107bcc13dc16a1b37234be Parents: af141db Author: Hung Tran <[email protected]> Authored: Tue Jan 9 13:56:25 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Jan 9 13:56:25 2018 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinHelixJobLauncher.java | 4 ++++ .../gobblin/cluster/GobblinHelixJobLauncherTest.java | 14 +++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fbf7c9bb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index fb8b579..73e5330 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -387,6 +387,10 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private void cleanupWorkingDirectory() throws IOException { LOGGER.info("Deleting persisted work units for job " + this.jobContext.getJobId()); stateStores.wuStateStore.delete(this.jobContext.getJobId()); + + // delete the directory that stores the task state files + stateStores.taskStateStore.delete(outputTaskStateDir.getName()); + LOGGER.info("Deleting job state file for job " + this.jobContext.getJobId()); Path jobStateFilePath = new Path(this.appWorkDir, this.jobContext.getJobId() + "." + JOB_STATE_FILE_NAME); this.fs.delete(jobStateFilePath, false); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fbf7c9bb/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index 64be11b..243c652 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -276,7 +276,7 @@ public class GobblinHelixJobLauncherTest { Assert.assertEquals(testListener.getCompletes().get() == 1, true); } - public void testJobContextCleanup() throws Exception { + public void testJobCleanup() throws Exception { final ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>(); final Properties properties = generateJobProperties(this.baseConfig, "3", "_1504201348473"); @@ -302,6 +302,18 @@ public class GobblinHelixJobLauncherTest { // job context should have been deleted Assert.assertNull(jobContext); + + // check that workunit and taskstate directory for the job are cleaned up + final File workunitsDir = + new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME + + File.separator + jobIdKey); + + final File taskstatesDir = + new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + + File.separator + jobIdKey); + + Assert.assertFalse(workunitsDir.exists()); + Assert.assertFalse(taskstatesDir.exists()); } @AfterClass
