Repository: hadoop
Updated Branches:
  refs/heads/trunk 58bae3544 -> 501a77856


MAPREDUCE-6774. Add support for HDFS erasure code policy to TestDFSIO. 
Contributed by Sammi Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/501a7785
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/501a7785
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/501a7785

Branch: refs/heads/trunk
Commit: 501a77856d6b6edfb261547117e719da7a9cd221
Parents: 58bae35
Author: Kai Zheng <kai.zh...@intel.com>
Authored: Sun Sep 18 09:03:15 2016 +0800
Committer: Kai Zheng <kai.zh...@intel.com>
Committed: Sun Sep 18 09:03:15 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/TestDFSIO.java    | 159 +++++++++++++++----
 1 file changed, 124 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/501a7785/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
index 05d4d77..d218169 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -106,9 +107,14 @@ public class TestDFSIO implements Tool {
                     " [-nrFiles N]" +
                     " [-size Size[B|KB|MB|GB|TB]]" +
                     " [-resFile resultFileName] [-bufferSize Bytes]" +
-                    " [-storagePolicy storagePolicyName]";
+                    " [-storagePolicy storagePolicyName]" +
+                    " [-erasureCodePolicy erasureCodePolicyName]";
 
   private Configuration config;
+  private static final String STORAGE_POLICY_NAME_KEY =
+      "test.io.block.storage.policy";
+  private static final String ERASURE_CODE_POLICY_NAME_KEY =
+      "test.io.erasure.code.policy";
 
   static{
     Configuration.addDefaultResource("hdfs-default.xml");
@@ -211,9 +217,9 @@ public class TestDFSIO implements Tool {
     bench = new TestDFSIO();
     bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     cluster = new MiniDFSCluster.Builder(bench.getConf())
-                                .numDataNodes(2)
-                                .format(true)
-                                .build();
+        .numDataNodes(2)
+        .format(true)
+        .build();
     FileSystem fs = cluster.getFileSystem();
     bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
 
@@ -356,7 +362,7 @@ public class TestDFSIO implements Tool {
             ReflectionUtils.newInstance(codec, getConf());
       }
 
-      blockStoragePolicy = getConf().get("test.io.block.storage.policy", null);
+      blockStoragePolicy = getConf().get(STORAGE_POLICY_NAME_KEY, null);
     }
 
     @Override // IOMapperBase
@@ -388,9 +394,10 @@ public class TestDFSIO implements Tool {
    */
   public static class WriteMapper extends IOStatMapper {
 
-    public WriteMapper() { 
-      for(int i=0; i < bufferSize; i++)
-        buffer[i] = (byte)('0' + i % 50);
+    public WriteMapper() {
+      for (int i = 0; i < bufferSize; i++) {
+        buffer[i] = (byte) ('0' + i % 50);
+      }
     }
 
     @Override // IOMapperBase
@@ -431,6 +438,9 @@ public class TestDFSIO implements Tool {
     fs.delete(getDataDir(config), true);
     fs.delete(writeDir, true);
     long tStart = System.currentTimeMillis();
+    if (isECEnabled()) {
+      createAndEnableECOnPath(fs, getDataDir(config));
+    }
     runIOTest(WriteMapper.class, writeDir);
     long execTime = System.currentTimeMillis() - tStart;
     return execTime;
@@ -734,6 +744,7 @@ public class TestDFSIO implements Tool {
     TestType testType = null;
     int bufferSize = DEFAULT_BUFFER_SIZE;
     long nrBytes = 1*MEGA;
+    String erasureCodePolicyName = null;
     int nrFiles = 1;
     long skipSize = 0;
     String resFileName = DEFAULT_RES_FILE_NAME;
@@ -785,26 +796,31 @@ public class TestDFSIO implements Tool {
         resFileName = args[++i];
       } else if (args[i].equalsIgnoreCase("-storagePolicy")) {
         storagePolicy = args[++i];
+      } else if (args[i].equalsIgnoreCase("-erasureCodePolicy")) {
+        erasureCodePolicyName = args[++i];
       } else {
         System.err.println("Illegal argument: " + args[i]);
         return -1;
       }
     }
-    if(testType == null)
+    if (testType == null) {
       return -1;
-    if(testType == TestType.TEST_TYPE_READ_BACKWARD)
+    }
+    if (testType == TestType.TEST_TYPE_READ_BACKWARD) {
       skipSize = -bufferSize;
-    else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
+    } else if (testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) {
       skipSize = bufferSize;
+    }
 
     LOG.info("nrFiles = " + nrFiles);
     LOG.info("nrBytes (MB) = " + toMB(nrBytes));
     LOG.info("bufferSize = " + bufferSize);
-    if(skipSize > 0)
+    if (skipSize > 0) {
       LOG.info("skipSize = " + skipSize);
+    }
     LOG.info("baseDir = " + getBaseDir(config));
     
-    if(compressionClass != null) {
+    if (compressionClass != null) {
       config.set("test.io.compression.class", compressionClass);
       LOG.info("compressionClass = " + compressionClass);
     }
@@ -813,31 +829,16 @@ public class TestDFSIO implements Tool {
     config.setLong("test.io.skip.size", skipSize);
     FileSystem fs = FileSystem.get(config);
 
-    if (storagePolicy != null) {
-      boolean isValid = false;
-      Collection<BlockStoragePolicy> storagePolicies =
-          ((DistributedFileSystem) fs).getAllStoragePolicies();
-      try {
-        for (BlockStoragePolicy policy : storagePolicies) {
-          if (policy.getName().equals(storagePolicy)) {
-            isValid = true;
-            break;
-          }
-        }
-      } catch (Exception e) {
-        throw new IOException("Get block storage policies error: ", e);
-      }
-      if (!isValid) {
-        System.out.println("Invalid block storage policy: " + storagePolicy);
-        System.out.println("Current supported storage policy list: ");
-        for (BlockStoragePolicy policy : storagePolicies) {
-          System.out.println(policy.getName());
-        }
+    if (erasureCodePolicyName != null) {
+      if (!checkErasureCodePolicy(erasureCodePolicyName, fs, testType)) {
         return -1;
       }
+    }
 
-      config.set("test.io.block.storage.policy", storagePolicy);
-      LOG.info("storagePolicy = " + storagePolicy);
+    if (storagePolicy != null) {
+      if (!checkStoragePolicy(storagePolicy, fs)) {
+        return -1;
+      }
     }
 
     if (isSequential) {
@@ -908,6 +909,94 @@ public class TestDFSIO implements Tool {
     return ((float)bytes)/MEGA;
   }
 
+  private boolean checkErasureCodePolicy(String erasureCodePolicyName,
+      FileSystem fs, TestType testType) throws IOException {
+    Collection<ErasureCodingPolicy> list =
+        ((DistributedFileSystem) fs).getAllErasureCodingPolicies();
+    boolean isValid = false;
+    for (ErasureCodingPolicy ec : list) {
+      if (erasureCodePolicyName.equals(ec.getName())) {
+        isValid = true;
+        break;
+      }
+    }
+
+    if (!isValid) {
+      System.out.println("Invalid erasure code policy: " +
+          erasureCodePolicyName);
+      System.out.println("Current supported erasure code policy list: ");
+      for (ErasureCodingPolicy ec : list) {
+        System.out.println(ec.getName());
+      }
+      return false;
+    }
+
+    if (testType == TestType.TEST_TYPE_APPEND ||
+        testType == TestType.TEST_TYPE_TRUNCATE) {
+      System.out.println("So far append or truncate operation" +
+          " does not support erasureCodePolicy");
+      return false;
+    }
+
+    config.set(ERASURE_CODE_POLICY_NAME_KEY, erasureCodePolicyName);
+    LOG.info("erasureCodePolicy = " + erasureCodePolicyName);
+    return true;
+  }
+
+  private boolean checkStoragePolicy(String storagePolicy, FileSystem fs)
+      throws IOException {
+    boolean isValid = false;
+    Collection<BlockStoragePolicy> storagePolicies =
+        ((DistributedFileSystem) fs).getAllStoragePolicies();
+    try {
+      for (BlockStoragePolicy policy : storagePolicies) {
+        if (policy.getName().equals(storagePolicy)) {
+          isValid = true;
+          break;
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException("Get block storage policies error: ", e);
+    }
+
+    if (!isValid) {
+      System.out.println("Invalid block storage policy: " + storagePolicy);
+      System.out.println("Current supported storage policy list: ");
+      for (BlockStoragePolicy policy : storagePolicies) {
+        System.out.println(policy.getName());
+      }
+      return false;
+    }
+
+    config.set(STORAGE_POLICY_NAME_KEY, storagePolicy);
+    LOG.info("storagePolicy = " + storagePolicy);
+    return true;
+  }
+
+  private boolean isECEnabled() {
+    String erasureCodePolicyName =
+        getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null);
+    return erasureCodePolicyName != null ? true : false;
+  }
+
+  void createAndEnableECOnPath(FileSystem fs, Path path)
+      throws IOException {
+    String erasureCodePolicyName =
+        getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null);
+
+    fs.mkdirs(path);
+    Collection<ErasureCodingPolicy> list =
+        ((DistributedFileSystem) fs).getAllErasureCodingPolicies();
+    for (ErasureCodingPolicy ec : list) {
+      if (erasureCodePolicyName.equals(ec.getName())) {
+        ((DistributedFileSystem) fs).setErasureCodingPolicy(path, ec);
+        LOG.info("enable erasureCodePolicy = " + erasureCodePolicyName  +
+            " on " + path.toString());
+        break;
+      }
+    }
+  }
+
   private void analyzeResult( FileSystem fs,
                               TestType testType,
                               long execTime,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to