Repository: incubator-gobblin Updated Branches: refs/heads/master 5fa983268 -> 6e4a2cecb
[GOBBLIN-257] Remove jobs' previous run data Closes #2109 from arjun4084346/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6e4a2cec Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6e4a2cec Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6e4a2cec Branch: refs/heads/master Commit: 6e4a2cecbec7bf8a16cc9d5e460c5365fd0bd856 Parents: 5fa9832 Author: Arjun <[email protected]> Authored: Fri Sep 22 12:55:36 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Fri Sep 22 12:55:36 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 2 + .../gobblin/runtime/AbstractJobLauncher.java | 15 +++++- .../org/apache/gobblin/runtime/JobContext.java | 26 ++++++++-- .../apache/gobblin/runtime/JobContextTest.java | 50 ++++++++++++++++++++ .../org/apache/gobblin/util/HadoopUtils.java | 16 +++++++ .../apache/gobblin/util/JobLauncherUtils.java | 25 ++++++++++ 6 files changed, 129 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 303ad15..0ef8416 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -180,6 +180,8 @@ public class ConfigurationKeys { public static final String CLEANUP_STAGING_DATA_PER_TASK = "cleanup.staging.data.per.task"; public static final boolean DEFAULT_CLEANUP_STAGING_DATA_PER_TASK = true; public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = "cleanup.staging.data.by.initializer"; + public static final String CLEANUP_OLD_JOBS_DATA = "cleanup.old.job.data"; + public static final boolean DEFAULT_CLEANUP_OLD_JOBS_DATA = false; public static final String QUEUED_TASK_TIME_MAX_SIZE = "taskexecutor.queued_task_time.history.max_size"; public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index 49fa98e..d59e097 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -18,6 +18,8 @@ package org.apache.gobblin.runtime; import java.io.IOException; +import java.net.URI; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -26,6 +28,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.WriterUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -48,7 +54,6 @@ import org.apache.gobblin.source.workunit.BasicWorkUnitStream; import org.apache.gobblin.source.workunit.WorkUnitStream; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; -import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.commit.CommitSequence; import org.apache.gobblin.commit.CommitSequenceStore; @@ -830,6 +835,9 @@ public abstract class AbstractJobLauncher implements JobLauncher { throw new RuntimeException("Work unit streams do not support cleaning staging data per task."); } } else { + if (jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_OLD_JOBS_DATA, ConfigurationKeys.DEFAULT_CLEANUP_OLD_JOBS_DATA)) { + JobLauncherUtils.cleanUpOldJobData(jobState, LOG, jobContext.getStagingDirProvided(), jobContext.getOutputDirProvided()); + } JobLauncherUtils.cleanJobStagingData(jobState, LOG); } } catch (Throwable t) { @@ -838,6 +846,11 @@ public abstract class AbstractJobLauncher implements JobLauncher { } } + + + private static String getJobIdPrefix(String jobId) { + return jobId.substring(0, jobId.lastIndexOf(Id.Job.SEPARATOR) + 1); + } /** * Cleanup the job's task staging data. This is not doing anything in case job succeeds * and data is successfully committed because the staging data has already been moved http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java index 33c5701..0debcac 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java @@ -109,6 +109,12 @@ public class JobContext implements Closeable { private final boolean parallelizeCommit; private final int parallelCommits; + // Were WRITER_STAGING_DIR and WRITER_OUTPUT_DIR provided in the job file + @Getter + protected final Boolean stagingDirProvided; + @Getter + protected final Boolean outputDirProvided; + @Getter private final DeliverySemantics semantics; @@ -147,6 +153,9 @@ public class JobContext implements Closeable { new JobState(jobPropsState, this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName), this.jobName, this.jobId); + stagingDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR); + outputDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR); + setTaskStagingAndOutputDirs(); if (GobblinMetrics.isEnabled(jobProps)) { @@ -287,26 +296,35 @@ public class JobContext implements Closeable { return this.source; } - protected void setTaskStagingAndOutputDirs() { + /** + * Appends two paths + * @param dir1 + * @param dir2 + * @return appended path + */ + protected static Path getJobDir(String dir1, String dir2) { + return new Path(dir1, dir2); + } + protected void setTaskStagingAndOutputDirs() { // Add jobId to writer staging dir if (this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR)) { String writerStagingDirWithJobId = - new Path(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR), this.jobId).toString(); + new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR), this.getJobName()), this.jobId).toString(); this.jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, writerStagingDirWithJobId); } // Add jobId to writer output dir if (this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) { String writerOutputDirWithJobId = - new Path(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), this.jobId).toString(); + new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), this.getJobName()), this.jobId).toString(); this.jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, writerOutputDirWithJobId); } // Add jobId to task data root dir if (this.jobState.contains(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY)) { String taskDataRootDirWithJobId = - new Path(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY), this.jobId).toString(); + new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY), this.getJobName()), this.jobId).toString(); this.jobState.setProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY, taskDataRootDirWithJobId); setTaskStagingDir(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java index d3dba5b..a21a0d8 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java @@ -29,18 +29,27 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.slf4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import com.google.common.io.Files; import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.util.Either; +import org.apache.gobblin.util.Id; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -214,6 +223,47 @@ public class JobContextTest { Assert.assertEquals(jobContext.getJobState().getState(), JobState.RunningState.FAILED); } + @Test + public void testCleanUpOldJobData() throws Exception { + String rootPath = Files.createTempDir().getAbsolutePath(); + final String JOB_PREFIX = Id.Job.PREFIX; + final String JOB_NAME1 = "GobblinKafka"; + final String JOB_NAME2 = "GobblinBrooklin"; + final long timestamp1 = 1505774129247L; + final long timestamp2 = 1505774129248L; + final Joiner JOINER = Joiner.on(Id.SEPARATOR).skipNulls(); + Object[] oldJob1 = new Object[]{JOB_PREFIX, JOB_NAME1, timestamp1}; + Object[] oldJob2 = new Object[]{JOB_PREFIX, JOB_NAME2, timestamp1}; + Object[] currentJob = new Object[]{JOB_PREFIX, JOB_NAME1, timestamp2}; + + Path currentJobPath = new Path(JobContext.getJobDir(rootPath, JOB_NAME1), JOINER.join(currentJob)); + Path oldJobPath1 = new Path(JobContext.getJobDir(rootPath, JOB_NAME1), JOINER.join(oldJob1)); + Path oldJobPath2 = new Path(JobContext.getJobDir(rootPath, JOB_NAME2), JOINER.join(oldJob2)); + Path stagingPath = new Path(currentJobPath, "task-staging"); + Path outputPath = new Path(currentJobPath, "task-output"); + + FileSystem fs = FileSystem.getLocal(new Configuration()); + fs.mkdirs(currentJobPath); + fs.mkdirs(oldJobPath1); + fs.mkdirs(oldJobPath2); + log.info("Created : {} {} {}", currentJobPath, oldJobPath1, oldJobPath2); + + gobblin.runtime.JobState jobState = new gobblin.runtime.JobState(); + jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingPath.toString()); + jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputPath.toString()); + + JobContext jobContext = mock(JobContext.class); + when(jobContext.getStagingDirProvided()).thenReturn(false); + when(jobContext.getOutputDirProvided()).thenReturn(false); + when(jobContext.getJobId()).thenReturn(currentJobPath.getName().toString()); + + JobLauncherUtils.cleanUpOldJobData(jobState, log, jobContext.getStagingDirProvided(), jobContext.getOutputDirProvided()); + + Assert.assertFalse(fs.exists(oldJobPath1)); + Assert.assertTrue(fs.exists(oldJobPath2)); + Assert.assertFalse(fs.exists(currentJobPath)); + } + /** * A {@link Callable} that blocks until a different thread calls {@link #unblock()}. */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java index 27ec5cd..4b01ab4 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; @@ -185,6 +186,21 @@ public class HadoopUtils { } /** + * Delete files according to the regular expression provided + * @param fs Filesystem object + * @param path base path + * @param regex regular expression to select files to delete + * @throws IOException + */ + public static void deletePathByRegex(FileSystem fs, final Path path, final String regex) throws IOException { + FileStatus[] statusList = fs.listStatus(path, path1 -> path1.getName().matches(regex)); + + for (final FileStatus oldJobFile : statusList) { + HadoopUtils.deletePath(fs, oldJobFile.getPath(), true); + } + } + + /** * Moves the object to the filesystem trash according to the file system policy. * @param fs FileSystem object * @param path Path to the object to be moved to trash. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java index df4d73a..45eb82d 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java @@ -20,8 +20,10 @@ package org.apache.gobblin.util; import java.io.IOException; import java.net.URI; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -236,6 +238,29 @@ public class JobLauncherUtils { } } + public static void cleanUpOldJobData(State state, Logger logger, boolean stagingDirProvided, boolean outputDirProvided) throws IOException { + Set<Path> jobPaths = new HashSet<>(); + String writerFsUri = state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI); + FileSystem fs = FileSystem.get(URI.create(writerFsUri), WriterUtils.getFsConfiguration(state)); + + Path jobPath; + if (stagingDirProvided) { + jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent(); + } else { + jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent().getParent(); + } + jobPaths.add(jobPath); + if (outputDirProvided) { + jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent(); + } else { + jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent().getParent(); + } + jobPaths.add(jobPath); + for (Path jobPathToDelete : jobPaths) { + logger.info("Cleaning up old job directory " + jobPathToDelete); + HadoopUtils.deletePath(fs, jobPathToDelete, true); + } + } /** * @param state * @param fsUri
