MAPREDUCE-6954. Disable erasure coding for files that are uploaded to the MR 
staging area (pbacsko via rkanter)

(cherry picked from commit 0adc0471d0c06f66a31060f270dcb50a7b4ffafa)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/917e875e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/917e875e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/917e875e

Branch: refs/heads/branch-3.0
Commit: 917e875e511589c297971f6a14fd85de7925409f
Parents: 3847b8c
Author: Robert Kanter <rkan...@apache.org>
Authored: Mon Sep 18 10:40:06 2017 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Mon Sep 18 10:42:01 2017 -0700

----------------------------------------------------------------------
 .../hadoop-mapreduce-client-core/pom.xml        |  4 ++
 .../hadoop/mapreduce/JobResourceUploader.java   | 17 ++++++++
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  5 +++
 .../src/main/resources/mapred-default.xml       |  9 ++++
 .../mapreduce/TestJobResourceUploader.java      | 46 ++++++++++++++++++++
 5 files changed, 81 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/917e875e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
index a23827d..ff88bcc 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
@@ -44,6 +44,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/917e875e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
index f1cad57..d9bf988 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 
@@ -94,6 +96,11 @@ class JobResourceUploader {
         new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
     mkdirs(jtFs, submitJobDir, mapredSysPerms);
 
+    if (!conf.getBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
+        MRJobConfig.DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED)) {
+      disableErasureCodingForPath(jtFs, submitJobDir);
+    }
+
     Collection<String> files = conf.getStringCollection("tmpfiles");
     Collection<String> libjars = conf.getStringCollection("tmpjars");
     Collection<String> archives = conf.getStringCollection("tmparchives");
@@ -575,4 +582,14 @@ class JobResourceUploader {
     }
     return finalPath;
   }
+
+  private void disableErasureCodingForPath(FileSystem fs, Path path)
+      throws IOException {
+    if (jtFs instanceof DistributedFileSystem) {
+      LOG.info("Disabling Erasure Coding for path: " + path);
+      DistributedFileSystem dfs = (DistributedFileSystem) jtFs;
+      dfs.setErasureCodingPolicy(path,
+          SystemErasureCodingPolicies.getReplicationPolicy().getName());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/917e875e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 2023ba3..86abb42 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -1037,4 +1037,9 @@ public interface MRJobConfig {
   String FINISH_JOB_WHEN_REDUCERS_DONE =
       "mapreduce.job.finish-when-all-reducers-done";
   boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true;
+
+  String MR_AM_STAGING_DIR_ERASURECODING_ENABLED =
+      MR_AM_STAGING_DIR + "erasurecoding.enabled";
+
+  boolean DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED = false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/917e875e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index ee9b906..6b6faf2 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1261,6 +1261,15 @@
 </property>
 
 <property>
+  <name>yarn.app.mapreduce.am.staging-dir.erasurecoding.enabled</name>
+  <value>false</value>
+  <description>Whether Erasure Coding should be enabled for
+  files that are copied to the MR staging area. This is a job-level
+  setting.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.am.max-attempts</name>
   <value>2</value>
   <description>The maximum number of application attempts. It is a

http://git-wip-us.apache.org/repos/asf/hadoop/blob/917e875e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
index 20b7b7d..d0d7a34 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
@@ -20,6 +20,11 @@ package org.apache.hadoop.mapreduce;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 
 import java.io.IOException;
 import java.net.URI;
@@ -36,9 +41,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.verification.VerificationMode;
 
 /**
  * A class for unit testing JobResourceUploader.
@@ -357,6 +365,40 @@ public class TestJobResourceUploader {
         expectedArchivesWithFrags, expectedJobJar);
   }
 
+  @Test
+  public void testErasureCodingDefault() throws IOException {
+    testErasureCodingSetting(true);
+  }
+
+  @Test
+  public void testErasureCodingDisabled() throws IOException {
+    testErasureCodingSetting(false);
+  }
+
+  private void testErasureCodingSetting(boolean defaultBehavior)
+      throws IOException {
+    JobConf jConf = new JobConf();
+    // don't set to false if EC remains disabled to check default setting
+    if (!defaultBehavior) {
+      jConf.setBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
+          true);
+    }
+
+    DistributedFileSystem fs = mock(DistributedFileSystem.class);
+    Path path = new Path("/");
+    when(fs.makeQualified(any(Path.class))).thenReturn(path);
+    JobResourceUploader uploader = new StubedUploader(fs, true);
+    Job job = Job.getInstance(jConf);
+
+    uploader.uploadResources(job, new Path("/test"));
+
+    String replicationPolicyName = SystemErasureCodingPolicies
+        .getReplicationPolicy().getName();
+    VerificationMode mode = defaultBehavior ? times(1) : never();
+    verify(fs, mode).setErasureCodingPolicy(eq(path),
+        eq(replicationPolicyName));
+  }
+
   private void runTmpResourcePathTest(JobResourceUploader uploader,
       ResourceConf rConf, JobConf jConf, String[] expectedFiles,
       String[] expectedArchives, String expectedJobJar) throws IOException {
@@ -698,6 +740,10 @@ public class TestJobResourceUploader {
       super(FileSystem.getLocal(conf), useWildcard);
     }
 
+    StubedUploader(FileSystem fs, boolean useWildcard) throws IOException {
+      super(fs, useWildcard);
+    }
+
     @Override
     FileStatus getFileStatus(Map<URI, FileStatus> statCache, Configuration job,
         Path p) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to