Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e3f9de1a4 -> a34a81a42


[GOBBLIN-309] Disabled rewrite and enabled retry for adding jar file on HDFS

Closes #2163 from autumnust/jarcopyfix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a34a81a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a34a81a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a34a81a4

Branch: refs/heads/master
Commit: a34a81a42a73705558e32f38af54835fcea47325
Parents: e3f9de1
Author: Lei Sun <[email protected]>
Authored: Mon Nov 13 12:21:01 2017 -0800
Committer: Hung Tran <[email protected]>
Committed: Mon Nov 13 12:21:01 2017 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   2 +
 .../runtime/mapreduce/MRJobLauncher.java        | 142 ++++++++++++-------
 2 files changed, 91 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a34a81a4/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c8de615..a563b43 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -182,6 +182,8 @@ public class ConfigurationKeys {
   public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = 
"cleanup.staging.data.by.initializer";
   public static final String CLEANUP_OLD_JOBS_DATA = "cleanup.old.job.data";
   public static final boolean DEFAULT_CLEANUP_OLD_JOBS_DATA = false;
+  public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY = 
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
+
 
   public static final String QUEUED_TASK_TIME_MAX_SIZE = 
"taskexecutor.queued_task_time.history.max_size";
   public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a34a81a4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index dcb6a14..9f17db1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -21,8 +21,10 @@ import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -56,9 +58,9 @@ import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ServiceManager;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -114,6 +116,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private static final String OUTPUT_DIR_NAME = "output";
   private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist";
 
+  // Configuration that make uploading of jar files more reliable,
+  // since multiple Gobblin Jobs are sharing the same jar directory.
+  private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
+  private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
   private static final Splitter SPLITTER = 
Splitter.on(',').omitEmptyStrings().trimResults();
 
   private final Configuration conf;
@@ -134,13 +141,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
 
   private final StateStore<TaskState> taskStateStore;
 
-  public MRJobLauncher(Properties jobProps)
-      throws Exception {
+  private final int jarFileMaximumRetry;
+
+  public MRJobLauncher(Properties jobProps) throws Exception {
     this(jobProps, null);
   }
 
-  public MRJobLauncher(Properties jobProps, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker)
-      throws Exception {
+  public MRJobLauncher(Properties jobProps, 
SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
     this(jobProps, new Configuration(), instanceBroker);
   }
 
@@ -193,12 +200,16 @@ public class MRJobLauncher extends AbstractJobLauncher {
         new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), 
this.eventBus, taskStateStore,
             outputTaskStateDir);
 
+    this.jarFileMaximumRetry =
+        
jobProps.containsKey(ConfigurationKeys.MAXIMUM_JAR_COPY_RETRY_TIMES_KEY) ? 
Integer.parseInt(
+            
jobProps.getProperty(ConfigurationKeys.MAXIMUM_JAR_COPY_RETRY_TIMES_KEY))
+            : MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT;
+
     startCancellationExecutor();
   }
 
   @Override
-  public void close()
-      throws IOException {
+  public void close() throws IOException {
     try {
       if (this.hadoopJobSubmitted && !this.job.isComplete()) {
         LOG.info("Killing the Hadoop MR job for job " + 
this.jobContext.getJobId());
@@ -215,8 +226,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
   }
 
   @Override
-  protected void runWorkUnits(List<WorkUnit> workUnits)
-      throws Exception {
+  protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
     String jobName = this.jobContext.getJobName();
     JobState jobState = this.jobContext.getJobState();
 
@@ -278,8 +288,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
   /**
    * Add dependent jars and files.
    */
-  private void addDependencies(Configuration conf)
-      throws IOException {
+  private void addDependencies(Configuration conf) throws IOException {
     TimingEvent distributedCacheSetupTimer =
         
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_DISTRIBUTED_CACHE_SETUP);
 
@@ -317,8 +326,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
   /**
    * Prepare the Hadoop MR job, including configuring the job and setting up 
the input/output paths.
    */
-  private void prepareHadoopJob(List<WorkUnit> workUnits)
-      throws IOException {
+  private void prepareHadoopJob(List<WorkUnit> workUnits) throws IOException {
     TimingEvent mrJobSetupTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_SETUP);
 
     // Add dependent jars/files
@@ -386,34 +394,66 @@ public class MRJobLauncher extends AbstractJobLauncher {
    * so the mappers can use them.
    */
   @SuppressWarnings("deprecation")
-  private void addJars(Path jarFileDir, String jarFileList, Configuration conf)
-      throws IOException {
+  private void addJars(Path jarFileDir, String jarFileList, Configuration 
conf) throws IOException {
     LocalFileSystem lfs = FileSystem.getLocal(conf);
     for (String jarFile : SPLITTER.split(jarFileList)) {
       Path srcJarFile = new Path(jarFile);
       FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
+
       for (FileStatus status : fileStatusList) {
-        // SNAPSHOT jars should not be shared, as different jobs may be using 
different versions of it
-        Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? 
this.unsharedJarsDir : jarFileDir;
-        // DistributedCache requires absolute path, so we need to use 
makeQualified.
-        Path destJarFile = new Path(this.fs.makeQualified(baseDir), 
status.getPath().getName());
-        if (!this.fs.exists(destJarFile)) {
-          // Copy the jar file from local file system to HDFS
-          this.fs.copyFromLocalFile(status.getPath(), destJarFile);
+        // For each FileStatus there are chances it could fail in copying at 
the first attempt, due to file-existence
+        // or file-copy is ongoing by other job instance since all Gobblin 
jobs share the same jar file directory.
+        // the retryCount is to avoid cases (if any) where retry is going too 
far and causes job hanging.
+        int retryCount = 0;
+        boolean shouldFileBeAddedIntoDC = true;
+        Path destJarFile = calculateDestJarFile(status, jarFileDir);
+        // Adding destJarFile into HDFS until it exists and the size of file 
on targetPath matches the one on local path.
+        while (!this.fs.exists(destJarFile) || 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
+          try {
+            if (this.fs.exists(destJarFile) && 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
+              Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
+              throw new IOException("Waiting for file to complete on uploading 
... ");
+            }
+            // Set the first parameter as false for not deleting sourceFile
+            // Set the second parameter as false for not overwriting existing 
file on the target, by default it is true.
+            // If the file is preExisted but overwrite flag set to false, then 
an IOException if thrown.
+            this.fs.copyFromLocalFile(false, false, status.getPath(), 
destJarFile);
+          } catch (IOException | InterruptedException e) {
+            LOG.warn("Path:" + destJarFile + " is not copied successfully. 
Will require retry.");
+            retryCount += 1;
+            if (retryCount >= this.jarFileMaximumRetry) {
+              LOG.error("The jar file:" + destJarFile + "failed in being 
copied into hdfs", e);
+              // If retry reaches upper limit, skip copying this file.
+              shouldFileBeAddedIntoDC = false;
+              break;
+            }
+          }
+        }
+        if (shouldFileBeAddedIntoDC) {
+          // Then add the jar file on HDFS to the classpath
+          LOG.info(String.format("Adding %s to classpath", destJarFile));
+          DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
         }
-        // Then add the jar file on HDFS to the classpath
-        LOG.info(String.format("Adding %s to classpath", destJarFile));
-        DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
       }
     }
   }
 
   /**
+   * Calculate the target filePath of the jar file to be copied on HDFS,
+   * given the {@link FileStatus} of a jarFile and the path of directory that 
contains jar.
+   */
+  private Path calculateDestJarFile(FileStatus status, Path jarFileDir) {
+    // SNAPSHOT jars should not be shared, as different jobs may be using 
different versions of it
+    Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? 
this.unsharedJarsDir : jarFileDir;
+    // DistributedCache requires absolute path, so we need to use 
makeQualified.
+    return new Path(this.fs.makeQualified(baseDir), 
status.getPath().getName());
+  }
+
+  /**
    * Add local non-jar files the job depends on to DistributedCache.
    */
   @SuppressWarnings("deprecation")
-  private void addLocalFiles(Path jobFileDir, String jobFileList, 
Configuration conf)
-      throws IOException {
+  private void addLocalFiles(Path jobFileDir, String jobFileList, 
Configuration conf) throws IOException {
     DistributedCache.createSymlink(conf);
     for (String jobFile : SPLITTER.split(jobFileList)) {
       Path srcJobFile = new Path(jobFile);
@@ -446,8 +486,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     }
   }
 
-  private void addHdfsJars(String hdfsJarFileList, Configuration conf)
-      throws IOException {
+  private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws 
IOException {
     for (String jarFile : SPLITTER.split(hdfsJarFileList)) {
       FileStatus[] status = this.fs.listStatus(new Path(jarFile));
       for (FileStatus fileStatus : status) {
@@ -464,8 +503,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
    * Prepare the job input.
    * @throws IOException
    */
-  private void prepareJobInput(List<WorkUnit> workUnits)
-      throws IOException {
+  private void prepareJobInput(List<WorkUnit> workUnits) throws IOException {
     Closer closer = Closer.create();
     try {
       ParallelRunner parallelRunner = closer.register(new 
ParallelRunner(this.parallelRunnerThreads, this.fs));
@@ -513,8 +551,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
    * Create a {@link org.apache.gobblin.metrics.GobblinMetrics} instance for 
this job run from the Hadoop counters.
    */
   @VisibleForTesting
-  void countersToMetrics(GobblinMetrics metrics)
-      throws IOException {
+  void countersToMetrics(GobblinMetrics metrics) throws IOException {
     Optional<Counters> counters = 
Optional.fromNullable(this.job.getCounters());
 
     if (counters.isPresent()) {
@@ -532,8 +569,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     }
   }
 
-  private static FileSystem buildFileSystem(Properties jobProps, Configuration 
configuration)
-      throws IOException {
+  private static FileSystem buildFileSystem(Properties jobProps, Configuration 
configuration) throws IOException {
     URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
     return FileSystem.newInstance(fsUri, configuration);
   }
@@ -577,9 +613,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
         boolean foundStateFile = false;
         for (Path dcPath : 
DistributedCache.getLocalCacheFiles(context.getConfiguration())) {
           if (dcPath.getName().equals(jobStateFileName)) {
-            SerializationUtils
-                .deserializeStateFromInputStream(closer.register(new 
FileInputStream(dcPath.toUri().getPath())),
-                    this.jobState);
+            SerializationUtils.deserializeStateFromInputStream(
+                closer.register(new 
FileInputStream(dcPath.toUri().getPath())), this.jobState);
             foundStateFile = true;
             break;
           }
@@ -607,14 +642,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
       if (Boolean.valueOf(
           configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, 
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
         this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
-        
this.jobMetrics.get().startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration),
-            context.getTaskAttemptID().toString());
+        this.jobMetrics.get()
+            
.startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration),
+                context.getTaskAttemptID().toString());
       }
     }
 
     @Override
-    public void run(Context context)
-        throws IOException, InterruptedException {
+    public void run(Context context) throws IOException, InterruptedException {
       this.setup(context);
       GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
       try {
@@ -626,15 +661,19 @@ public class MRJobLauncher extends AbstractJobLauncher {
             isSpeculativeEnabled ? 
GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED
                 : GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE;
 
-        SharedResourcesBroker<GobblinScopeTypes> globalBroker = 
SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
-            ConfigFactory.parseProperties(this.jobState.getProperties()), 
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+        SharedResourcesBroker<GobblinScopeTypes> globalBroker =
+            SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+                ConfigFactory.parseProperties(this.jobState.getProperties()),
+                GobblinScopeTypes.GLOBAL.defaultScopeInstance());
         SharedResourcesBroker<GobblinScopeTypes> jobBroker =
-            globalBroker.newSubscopedBuilder(new 
JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build();
+            globalBroker.newSubscopedBuilder(new 
JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId()))
+                .build();
 
         // Actually run the list of WorkUnits
         gobblinMultiTaskAttempt =
-            GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), 
context.getTaskAttemptID().toString(), this.jobState, this.workUnits,
-                this.taskStateTracker, this.taskExecutor, this.taskStateStore, 
multiTaskAttemptCommitPolicy, jobBroker);
+            GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), 
context.getTaskAttemptID().toString(),
+                this.jobState, this.workUnits, this.taskStateTracker, 
this.taskExecutor, this.taskStateStore,
+                multiTaskAttemptCommitPolicy, jobBroker);
 
         if (this.isSpeculativeEnabled) {
           LOG.info("will not commit in task attempt");
@@ -646,14 +685,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
         CommitStep cleanUpCommitStep = new CommitStep() {
 
           @Override
-          public boolean isCompleted()
-              throws IOException {
+          public boolean isCompleted() throws IOException {
             return !serviceManager.isHealthy();
           }
 
           @Override
-          public void execute()
-              throws IOException {
+          public void execute() throws IOException {
             LOG.info("Starting the clean-up steps.");
             try {
               serviceManager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
@@ -682,8 +719,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     }
 
     @Override
-    public void map(LongWritable key, Text value, Context context)
-        throws IOException, InterruptedException {
+    public void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
       WorkUnit workUnit = 
(value.toString().endsWith(MULTI_WORK_UNIT_FILE_EXTENSION) ? 
MultiWorkUnit.createEmpty()
           : WorkUnit.createEmpty());
       SerializationUtils.deserializeState(this.fs, new Path(value.toString()), 
workUnit);

Reply via email to