Repository: incubator-gobblin Updated Branches: refs/heads/master e3f9de1a4 -> a34a81a42
[GOBBLIN-309] Disabled rewrite and enabled retry for adding jar file on HDFS Closes #2163 from autumnust/jarcopyfix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a34a81a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a34a81a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a34a81a4 Branch: refs/heads/master Commit: a34a81a42a73705558e32f38af54835fcea47325 Parents: e3f9de1 Author: Lei Sun <[email protected]> Authored: Mon Nov 13 12:21:01 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Nov 13 12:21:01 2017 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 2 + .../runtime/mapreduce/MRJobLauncher.java | 142 ++++++++++++------- 2 files changed, 91 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a34a81a4/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index c8de615..a563b43 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -182,6 +182,8 @@ public class ConfigurationKeys { public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = "cleanup.staging.data.by.initializer"; public static final String CLEANUP_OLD_JOBS_DATA = "cleanup.old.job.data"; public static final boolean DEFAULT_CLEANUP_OLD_JOBS_DATA = false; + public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY = JOB_JAR_FILES_KEY + ".uploading.retry.maximum"; + public static final String QUEUED_TASK_TIME_MAX_SIZE = "taskexecutor.queued_task_time.history.max_size"; public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a34a81a4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java index dcb6a14..9f17db1 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -21,8 +21,10 @@ import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -56,9 +58,9 @@ import com.google.common.io.Closer; import com.google.common.util.concurrent.ServiceManager; import com.typesafe.config.ConfigFactory; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; -import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -114,6 +116,11 @@ public class MRJobLauncher extends AbstractJobLauncher { private static final String OUTPUT_DIR_NAME = "output"; private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist"; + // Configuration that make uploading of jar files more reliable, + // since multiple Gobblin Jobs are sharing the same jar directory. + private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5; + private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000; + private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults(); private final Configuration conf; @@ -134,13 +141,13 @@ public class MRJobLauncher extends AbstractJobLauncher { private final StateStore<TaskState> taskStateStore; - public MRJobLauncher(Properties jobProps) - throws Exception { + private final int jarFileMaximumRetry; + + public MRJobLauncher(Properties jobProps) throws Exception { this(jobProps, null); } - public MRJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) - throws Exception { + public MRJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception { this(jobProps, new Configuration(), instanceBroker); } @@ -193,12 +200,16 @@ public class MRJobLauncher extends AbstractJobLauncher { new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, taskStateStore, outputTaskStateDir); + this.jarFileMaximumRetry = + jobProps.containsKey(ConfigurationKeys.MAXIMUM_JAR_COPY_RETRY_TIMES_KEY) ? Integer.parseInt( + jobProps.getProperty(ConfigurationKeys.MAXIMUM_JAR_COPY_RETRY_TIMES_KEY)) + : MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT; + startCancellationExecutor(); } @Override - public void close() - throws IOException { + public void close() throws IOException { try { if (this.hadoopJobSubmitted && !this.job.isComplete()) { LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId()); @@ -215,8 +226,7 @@ public class MRJobLauncher extends AbstractJobLauncher { } @Override - protected void runWorkUnits(List<WorkUnit> workUnits) - throws Exception { + protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception { String jobName = this.jobContext.getJobName(); JobState jobState = this.jobContext.getJobState(); @@ -278,8 +288,7 @@ public class MRJobLauncher extends AbstractJobLauncher { /** * Add dependent jars and files. */ - private void addDependencies(Configuration conf) - throws IOException { + private void addDependencies(Configuration conf) throws IOException { TimingEvent distributedCacheSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_DISTRIBUTED_CACHE_SETUP); @@ -317,8 +326,7 @@ public class MRJobLauncher extends AbstractJobLauncher { /** * Prepare the Hadoop MR job, including configuring the job and setting up the input/output paths. */ - private void prepareHadoopJob(List<WorkUnit> workUnits) - throws IOException { + private void prepareHadoopJob(List<WorkUnit> workUnits) throws IOException { TimingEvent mrJobSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_SETUP); // Add dependent jars/files @@ -386,34 +394,66 @@ public class MRJobLauncher extends AbstractJobLauncher { * so the mappers can use them. */ @SuppressWarnings("deprecation") - private void addJars(Path jarFileDir, String jarFileList, Configuration conf) - throws IOException { + private void addJars(Path jarFileDir, String jarFileList, Configuration conf) throws IOException { LocalFileSystem lfs = FileSystem.getLocal(conf); for (String jarFile : SPLITTER.split(jarFileList)) { Path srcJarFile = new Path(jarFile); FileStatus[] fileStatusList = lfs.globStatus(srcJarFile); + for (FileStatus status : fileStatusList) { - // SNAPSHOT jars should not be shared, as different jobs may be using different versions of it - Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir : jarFileDir; - // DistributedCache requires absolute path, so we need to use makeQualified. - Path destJarFile = new Path(this.fs.makeQualified(baseDir), status.getPath().getName()); - if (!this.fs.exists(destJarFile)) { - // Copy the jar file from local file system to HDFS - this.fs.copyFromLocalFile(status.getPath(), destJarFile); + // For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence + // or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory. + // the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging. + int retryCount = 0; + boolean shouldFileBeAddedIntoDC = true; + Path destJarFile = calculateDestJarFile(status, jarFileDir); + // Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path. + while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) { + try { + if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) { + Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD); + throw new IOException("Waiting for file to complete on uploading ... "); + } + // Set the first parameter as false for not deleting sourceFile + // Set the second parameter as false for not overwriting existing file on the target, by default it is true. + // If the file is preExisted but overwrite flag set to false, then an IOException if thrown. + this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile); + } catch (IOException | InterruptedException e) { + LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry."); + retryCount += 1; + if (retryCount >= this.jarFileMaximumRetry) { + LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e); + // If retry reaches upper limit, skip copying this file. + shouldFileBeAddedIntoDC = false; + break; + } + } + } + if (shouldFileBeAddedIntoDC) { + // Then add the jar file on HDFS to the classpath + LOG.info(String.format("Adding %s to classpath", destJarFile)); + DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); } - // Then add the jar file on HDFS to the classpath - LOG.info(String.format("Adding %s to classpath", destJarFile)); - DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); } } } /** + * Calculate the target filePath of the jar file to be copied on HDFS, + * given the {@link FileStatus} of a jarFile and the path of directory that contains jar. + */ + private Path calculateDestJarFile(FileStatus status, Path jarFileDir) { + // SNAPSHOT jars should not be shared, as different jobs may be using different versions of it + Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir : jarFileDir; + // DistributedCache requires absolute path, so we need to use makeQualified. + return new Path(this.fs.makeQualified(baseDir), status.getPath().getName()); + } + + /** * Add local non-jar files the job depends on to DistributedCache. */ @SuppressWarnings("deprecation") - private void addLocalFiles(Path jobFileDir, String jobFileList, Configuration conf) - throws IOException { + private void addLocalFiles(Path jobFileDir, String jobFileList, Configuration conf) throws IOException { DistributedCache.createSymlink(conf); for (String jobFile : SPLITTER.split(jobFileList)) { Path srcJobFile = new Path(jobFile); @@ -446,8 +486,7 @@ public class MRJobLauncher extends AbstractJobLauncher { } } - private void addHdfsJars(String hdfsJarFileList, Configuration conf) - throws IOException { + private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws IOException { for (String jarFile : SPLITTER.split(hdfsJarFileList)) { FileStatus[] status = this.fs.listStatus(new Path(jarFile)); for (FileStatus fileStatus : status) { @@ -464,8 +503,7 @@ public class MRJobLauncher extends AbstractJobLauncher { * Prepare the job input. * @throws IOException */ - private void prepareJobInput(List<WorkUnit> workUnits) - throws IOException { + private void prepareJobInput(List<WorkUnit> workUnits) throws IOException { Closer closer = Closer.create(); try { ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads, this.fs)); @@ -513,8 +551,7 @@ public class MRJobLauncher extends AbstractJobLauncher { * Create a {@link org.apache.gobblin.metrics.GobblinMetrics} instance for this job run from the Hadoop counters. */ @VisibleForTesting - void countersToMetrics(GobblinMetrics metrics) - throws IOException { + void countersToMetrics(GobblinMetrics metrics) throws IOException { Optional<Counters> counters = Optional.fromNullable(this.job.getCounters()); if (counters.isPresent()) { @@ -532,8 +569,7 @@ public class MRJobLauncher extends AbstractJobLauncher { } } - private static FileSystem buildFileSystem(Properties jobProps, Configuration configuration) - throws IOException { + private static FileSystem buildFileSystem(Properties jobProps, Configuration configuration) throws IOException { URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI)); return FileSystem.newInstance(fsUri, configuration); } @@ -577,9 +613,8 @@ public class MRJobLauncher extends AbstractJobLauncher { boolean foundStateFile = false; for (Path dcPath : DistributedCache.getLocalCacheFiles(context.getConfiguration())) { if (dcPath.getName().equals(jobStateFileName)) { - SerializationUtils - .deserializeStateFromInputStream(closer.register(new FileInputStream(dcPath.toUri().getPath())), - this.jobState); + SerializationUtils.deserializeStateFromInputStream( + closer.register(new FileInputStream(dcPath.toUri().getPath())), this.jobState); foundStateFile = true; break; } @@ -607,14 +642,14 @@ public class MRJobLauncher extends AbstractJobLauncher { if (Boolean.valueOf( configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) { this.jobMetrics = Optional.of(JobMetrics.get(this.jobState)); - this.jobMetrics.get().startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration), - context.getTaskAttemptID().toString()); + this.jobMetrics.get() + .startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration), + context.getTaskAttemptID().toString()); } } @Override - public void run(Context context) - throws IOException, InterruptedException { + public void run(Context context) throws IOException, InterruptedException { this.setup(context); GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null; try { @@ -626,15 +661,19 @@ public class MRJobLauncher extends AbstractJobLauncher { isSpeculativeEnabled ? GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED : GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; - SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker( - ConfigFactory.parseProperties(this.jobState.getProperties()), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + SharedResourcesBroker<GobblinScopeTypes> globalBroker = + SharedResourcesBrokerFactory.createDefaultTopLevelBroker( + ConfigFactory.parseProperties(this.jobState.getProperties()), + GobblinScopeTypes.GLOBAL.defaultScopeInstance()); SharedResourcesBroker<GobblinScopeTypes> jobBroker = - globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build(); + globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())) + .build(); // Actually run the list of WorkUnits gobblinMultiTaskAttempt = - GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(), this.jobState, this.workUnits, - this.taskStateTracker, this.taskExecutor, this.taskStateStore, multiTaskAttemptCommitPolicy, jobBroker); + GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(), + this.jobState, this.workUnits, this.taskStateTracker, this.taskExecutor, this.taskStateStore, + multiTaskAttemptCommitPolicy, jobBroker); if (this.isSpeculativeEnabled) { LOG.info("will not commit in task attempt"); @@ -646,14 +685,12 @@ public class MRJobLauncher extends AbstractJobLauncher { CommitStep cleanUpCommitStep = new CommitStep() { @Override - public boolean isCompleted() - throws IOException { + public boolean isCompleted() throws IOException { return !serviceManager.isHealthy(); } @Override - public void execute() - throws IOException { + public void execute() throws IOException { LOG.info("Starting the clean-up steps."); try { serviceManager.stopAsync().awaitStopped(5, TimeUnit.SECONDS); @@ -682,8 +719,7 @@ public class MRJobLauncher extends AbstractJobLauncher { } @Override - public void map(LongWritable key, Text value, Context context) - throws IOException, InterruptedException { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { WorkUnit workUnit = (value.toString().endsWith(MULTI_WORK_UNIT_FILE_EXTENSION) ? MultiWorkUnit.createEmpty() : WorkUnit.createEmpty()); SerializationUtils.deserializeState(this.fs, new Path(value.toString()), workUnit);
