HDFS-11786. Add support to make copyFromLocal multi threaded. Contributed by 
Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02b141ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02b141ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02b141ac

Branch: refs/heads/HDFS-7240
Commit: 02b141ac6059323ec43e472ca36dc570fdca386f
Parents: b778887
Author: Anu Engineer <aengin...@apache.org>
Authored: Sun Jul 16 10:59:34 2017 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Sun Jul 16 10:59:34 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/fs/shell/CopyCommands.java    | 112 +++++++++++-
 .../apache/hadoop/fs/shell/MoveCommands.java    |   4 +-
 .../hadoop/fs/shell/TestCopyFromLocal.java      | 173 +++++++++++++++++++
 .../hadoop/fs/shell/TestCopyPreserveFlag.java   |  19 ++
 .../src/test/resources/testConf.xml             |  44 ++++-
 5 files changed, 346 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index e2fad75..7b3c53e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -26,7 +26,11 @@ import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -288,9 +292,113 @@ class CopyCommands {
   }
 
   public static class CopyFromLocal extends Put {
+    private ThreadPoolExecutor executor = null;
+    private int numThreads = 1;
+
+    private static final int MAX_THREADS =
+        Runtime.getRuntime().availableProcessors() * 2;
     public static final String NAME = "copyFromLocal";
-    public static final String USAGE = Put.USAGE;
-    public static final String DESCRIPTION = "Identical to the -put command.";
+    public static final String USAGE =
+        "[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
+    public static final String DESCRIPTION =
+        "Copy files from the local file system " +
+        "into fs. Copying fails if the file already " +
+        "exists, unless the -f flag is given.\n" +
+        "Flags:\n" +
+        "  -p : Preserves access and modification times, ownership and the" +
+        " mode.\n" +
+        "  -f : Overwrites the destination if it already exists.\n" +
+        "  -t <thread count> : Number of threads to be used, default is 1.\n" +
+        "  -l : Allow DataNode to lazily persist the file to disk. Forces" +
+        " replication factor of 1. This flag will result in reduced" +
+        " durability. Use with care.\n" +
+        "  -d : Skip creation of temporary file(<dst>._COPYING_).\n";
+
+    private void setNumberThreads(String numberThreadsString) {
+      if (numberThreadsString == null) {
+        numThreads = 1;
+      } else {
+        int parsedValue = Integer.parseInt(numberThreadsString);
+        if (parsedValue <= 1) {
+          numThreads = 1;
+        } else if (parsedValue > MAX_THREADS) {
+          numThreads = MAX_THREADS;
+        } else {
+          numThreads = parsedValue;
+        }
+      }
+    }
+
+    @Override
+    protected void processOptions(LinkedList<String> args) throws IOException {
+      CommandFormat cf =
+          new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
+      cf.addOptionWithValue("t");
+      cf.parse(args);
+      setNumberThreads(cf.getOptValue("t"));
+      setOverwrite(cf.getOpt("f"));
+      setPreserve(cf.getOpt("p"));
+      setLazyPersist(cf.getOpt("l"));
+      setDirectWrite(cf.getOpt("d"));
+      getRemoteDestination(args);
+      // should have a -r option
+      setRecursive(true);
+    }
+
+    private void copyFile(PathData src, PathData target) throws IOException {
+      if (isPathRecursable(src)) {
+        throw new PathIsDirectoryException(src.toString());
+      }
+      super.copyFileToTarget(src, target);
+    }
+
+    @Override
+    protected void copyFileToTarget(PathData src, PathData target)
+        throws IOException {
+      // if number of thread is 1, mimic put and avoid threading overhead
+      if (numThreads == 1) {
+        copyFile(src, target);
+        return;
+      }
+
+      Runnable task = () -> {
+        try {
+          copyFile(src, target);
+        } catch (IOException e) {
+          displayError(e);
+        }
+      };
+      executor.submit(task);
+    }
+
+    @Override
+    protected void processArguments(LinkedList<PathData> args)
+        throws IOException {
+      executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
+          TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
+          new ThreadPoolExecutor.CallerRunsPolicy());
+      super.processArguments(args);
+
+      // issue the command and then wait for it to finish
+      executor.shutdown();
+      try {
+        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+      } catch (InterruptedException e) {
+        executor.shutdownNow();
+        displayError(e);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @VisibleForTesting
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    @VisibleForTesting
+    public ThreadPoolExecutor getExecutor() {
+      return executor;
+    }
   }
  
   public static class CopyToLocal extends Get {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
index d359282..5ef4277 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
+import org.apache.hadoop.fs.shell.CopyCommands.Put;
 
 /** Various commands for moving files */
 @InterfaceAudience.Private
@@ -41,7 +41,7 @@ class MoveCommands {
   /**
    *  Move local files to a remote filesystem
    */
-  public static class MoveFromLocal extends CopyFromLocal {
+  public static class MoveFromLocal extends Put {
     public static final String NAME = "moveFromLocal";
     public static final String USAGE = "<localsrc> ... <dst>";
     public static final String DESCRIPTION = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java
new file mode 100644
index 0000000..8d354b4
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.shell;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for copyFromLocal.
+ */
+public class TestCopyFromLocal {
+  private static final String FROM_DIR_NAME = "fromDir";
+  private static final String TO_DIR_NAME = "toDir";
+
+  private static FileSystem fs;
+  private static Path testDir;
+  private static Configuration conf;
+
+  public static int initialize(Path dir) throws Exception {
+    fs.mkdirs(dir);
+    Path fromDirPath = new Path(dir, FROM_DIR_NAME);
+    fs.mkdirs(fromDirPath);
+    Path toDirPath = new Path(dir, TO_DIR_NAME);
+    fs.mkdirs(toDirPath);
+
+    int numTotalFiles = 0;
+    int numDirs = RandomUtils.nextInt(5);
+    for (int dirCount = 0; dirCount < numDirs; ++dirCount) {
+      Path subDirPath = new Path(fromDirPath, "subdir" + dirCount);
+      fs.mkdirs(subDirPath);
+      int numFiles = RandomUtils.nextInt(10);
+      for (int fileCount = 0; fileCount < numFiles; ++fileCount) {
+        numTotalFiles++;
+        Path subFile = new Path(subDirPath, "file" + fileCount);
+        fs.createNewFile(subFile);
+        FSDataOutputStream output = fs.create(subFile, true);
+        for(int i = 0; i < 100; ++i) {
+          output.writeInt(i);
+          output.writeChar('\n');
+        }
+        output.close();
+      }
+    }
+
+    return numTotalFiles;
+  }
+
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new Configuration(false);
+    conf.set("fs.file.impl", LocalFileSystem.class.getName());
+    fs = FileSystem.getLocal(conf);
+    testDir = new FileSystemTestHelper().getTestRootPath(fs);
+    // don't want scheme on the path, just an absolute path
+    testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
+
+    FileSystem.setDefaultUri(conf, fs.getUri());
+    fs.setWorkingDirectory(testDir);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    fs.delete(testDir, true);
+    fs.close();
+  }
+
+  private void run(CommandWithDestination cmd, String... args) {
+    cmd.setConf(conf);
+    assertEquals(0, cmd.run(args));
+  }
+
+  @Test(timeout = 10000)
+  public void testCopyFromLocal() throws Exception {
+    Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+    TestCopyFromLocal.initialize(dir);
+    run(new TestMultiThreadedCopy(1, 0),
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  @Test(timeout = 10000)
+  public void testCopyFromLocalWithThreads() throws Exception {
+    Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+    int numFiles = TestCopyFromLocal.initialize(dir);
+    int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
+    int randThreads = RandomUtils.nextInt(maxThreads);
+    int numActualThreads = randThreads == 0 ? 1 : randThreads;
+    String numThreads = Integer.toString(numActualThreads);
+    run(new TestMultiThreadedCopy(numActualThreads, numFiles), "-t", 
numThreads,
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  @Test(timeout = 10000)
+  public void testCopyFromLocalWithThreadWrong() throws Exception {
+    Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+    int numFiles = TestCopyFromLocal.initialize(dir);
+    int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
+    String numThreads = Integer.toString(maxThreads * 2);
+    run(new TestMultiThreadedCopy(maxThreads, numFiles), "-t", numThreads,
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  @Test(timeout = 10000)
+  public void testCopyFromLocalWithZeroThreads() throws Exception {
+    Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+    TestCopyFromLocal.initialize(dir);
+    run(new TestMultiThreadedCopy(1, 0), "-t", "0",
+        new Path(dir, FROM_DIR_NAME).toString(),
+        new Path(dir, TO_DIR_NAME).toString());
+  }
+
+  private class TestMultiThreadedCopy extends CopyFromLocal {
+    private int expectedThreads;
+    private int expectedCompletedTaskCount;
+
+    TestMultiThreadedCopy(int expectedThreads,
+                          int expectedCompletedTaskCount) {
+      this.expectedThreads = expectedThreads;
+      this.expectedCompletedTaskCount = expectedCompletedTaskCount;
+    }
+
+    @Override
+    protected void processArguments(LinkedList<PathData> args)
+        throws IOException {
+      // Check if the correct number of threads are spawned
+      Assert.assertEquals(expectedThreads, getNumThreads());
+      super.processArguments(args);
+      // Once the copy is complete, check following
+      // 1) number of completed tasks are same as expected
+      // 2) There are no active tasks in the executor
+      // 3) Executor has shutdown correctly
+      ThreadPoolExecutor executor = getExecutor();
+      Assert.assertEquals(executor.getCompletedTaskCount(),
+          expectedCompletedTaskCount);
+      Assert.assertEquals(executor.getActiveCount(), 0);
+      Assert.assertTrue(executor.isTerminated());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
index 47dc601..8dd09e5 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.shell.CopyCommands.Cp;
 import org.apache.hadoop.fs.shell.CopyCommands.Get;
 import org.apache.hadoop.fs.shell.CopyCommands.Put;
+import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -120,6 +121,24 @@ public class TestCopyPreserveFlag {
   }
 
   @Test(timeout = 10000)
+  public void testCopyFromLocal() throws Exception {
+    run(new CopyFromLocal(), FROM.toString(), TO.toString());
+    assertAttributesChanged(TO);
+  }
+
+  @Test(timeout = 10000)
+  public void testCopyFromLocalWithThreads() throws Exception {
+    run(new CopyFromLocal(), "-t", "10", FROM.toString(), TO.toString());
+    assertAttributesChanged(TO);
+  }
+
+  @Test(timeout = 10000)
+  public void testCopyFromLocalWithThreadsPreserve() throws Exception {
+    run(new CopyFromLocal(), "-p", "-t", "10", FROM.toString(), TO.toString());
+    assertAttributesPreserved(TO);
+  }
+
+  @Test(timeout = 10000)
   public void testGetWithP() throws Exception {
     run(new Get(), "-p", FROM.toString(), TO.toString());
     assertAttributesPreserved(TO);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml 
b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
index 342b17c..64677f8 100644
--- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
+++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
@@ -547,11 +547,51 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] 
&lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
+          <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t 
&lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; 
:\s*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*Identical to the -put 
command\.\s*</expected-output>
+          <expected-output>^\s*Copy files from the local file system into fs.( 
)*Copying fails if the file already( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*exists, unless the -f flag is given.( 
)*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*Flags:( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-p                 Preserves access and 
modification times, ownership and the( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*mode.( )*</expected-output>
+        </comparator>
+        <comparator>
+           <type>RegexpComparator</type>
+           <expected-output>^\s*-f                 Overwrites the destination 
if it already exists.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-t &lt;thread count&gt;  Number of threads to 
be used, default is 1.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-l                 Allow DataNode to lazily 
persist the file to disk. Forces( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*replication factor of 1. This flag will result 
in reduced( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*durability. Use with care.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-d                 Skip creation of temporary 
file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
         </comparator>
       </comparators>
     </test>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to