HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes. Contributed by Raju Bairishetti
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04fdf610 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04fdf610 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04fdf610 Branch: refs/heads/YARN-2928 Commit: 04fdf61066ca6be3eeea8eb9431d67923399fb90 Parents: 711141d Author: Amareshwari Sriramadasu <amareshw...@apache.org> Authored: Fri Jun 26 09:52:06 2015 +0530 Committer: Zhijie Shen <zjs...@apache.org> Committed: Mon Jun 29 10:28:26 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../org/apache/hadoop/tools/DistCpConstants.java | 2 +- .../apache/hadoop/tools/DistCpOptionSwitch.java | 5 +++-- .../org/apache/hadoop/tools/DistCpOptions.java | 6 +++--- .../org/apache/hadoop/tools/OptionsParser.java | 2 +- .../org/apache/hadoop/tools/mapred/CopyMapper.java | 17 ++++++++++++++++- .../tools/mapred/RetriableFileCopyCommand.java | 2 +- .../hadoop/tools/util/ThrottledInputStream.java | 6 +++--- .../org/apache/hadoop/tools/TestOptionsParser.java | 16 +++++++++------- 9 files changed, 40 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b2975dc..5901794 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -53,6 +53,9 @@ Trunk (Unreleased) IMPROVEMENTS + HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes + (Raju Bairishetti via amareshwari) + HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution not covered (Eric Charles via bobby) http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 21dca62..93d6a62 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -30,7 +30,7 @@ public class DistCpConstants { public static final int DEFAULT_MAPS = 20; /* Default bandwidth if none specified */ - public static final int DEFAULT_BANDWIDTH_MB = 100; + public static final float DEFAULT_BANDWIDTH_MB = 100; /* Default strategy for copying. Implementation looked up from distcp-default.xml http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index ed4a0b2..f16a5d2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -174,10 +174,11 @@ public enum DistCpOptionSwitch { "copied to <= n bytes")), /** - * Specify bandwidth per map in MB + * Specify bandwidth per map in MB, accepts bandwidth as a fraction */ BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, - new Option("bandwidth", true, "Specify bandwidth per map in MB")), + new Option("bandwidth", true, "Specify bandwidth per map in MB," + + " accepts bandwidth as a fraction.")), /** * Path containing a list of strings, which when found in the path of http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 302b626..5b4ccf9 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -47,7 +47,7 @@ public class DistCpOptions { public static final int maxNumListstatusThreads = 40; private int numListstatusThreads = 0; // Indicates that flag is not set. private int maxMaps = DistCpConstants.DEFAULT_MAPS; - private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; + private float mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; private String sslConfigurationFile; @@ -366,7 +366,7 @@ public class DistCpOptions { * * @return Bandwidth in MB */ - public int getMapBandwidth() { + public float getMapBandwidth() { return mapBandwidth; } @@ -375,7 +375,7 @@ public class DistCpOptions { * * @param mapBandwidth - per map bandwidth */ - public void setMapBandwidth(int mapBandwidth) { + public void setMapBandwidth(float mapBandwidth) { assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)"; this.mapBandwidth = mapBandwidth; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 37add1e..b414513 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -293,7 +293,7 @@ public class OptionsParser { DistCpOptions option) { if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { try { - Integer mapBandwidth = Integer.parseInt( + Float mapBandwidth = Float.parseFloat( getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim()); if (mapBandwidth <= 0) { throw new IllegalArgumentException("Bandwidth specified is not " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index cca36df..f75fe76 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -62,6 +62,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> BYTESEXPECTED,// Number of bytes expected to be copied. BYTESFAILED, // Number of bytes that failed to be copied. BYTESSKIPPED, // Number of bytes that were skipped from copy. + SLEEP_TIME_MS, // Time map slept while trying to honor bandwidth cap. + BANDWIDTH_IN_BYTES, // Effective transfer rate in B/s. } /** @@ -85,7 +87,9 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; - private Path targetWorkPath = null; + private Path targetWorkPath = null; + private long startEpoch; + private long totalBytesCopied = 0; /** * Implementation of the Mapper::setup() method. This extracts the DistCp- @@ -118,6 +122,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) { initializeSSLConf(context); } + startEpoch = System.currentTimeMillis(); } /** @@ -288,6 +293,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen()); incrementCounter(context, Counter.BYTESCOPIED, bytesCopied); incrementCounter(context, Counter.COPY, 1); + totalBytesCopied += bytesCopied; } private void createTargetDirsWithRetry(String description, @@ -373,4 +379,13 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text> return false; } } + + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + super.cleanup(context); + long secs = (System.currentTimeMillis() - startEpoch) / 1000; + incrementCounter(context, Counter.BANDWIDTH_IN_BYTES, + totalBytesCopied / ((secs == 0 ? 1 : secs))); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 65d644b..6b5078c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -293,7 +293,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { Configuration conf) throws IOException { try { FileSystem fs = path.getFileSystem(conf); - long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, + float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, DistCpConstants.DEFAULT_BANDWIDTH_MB); FSDataInputStream in = fs.open(path); return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024); http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 9e435d9..2be8ef0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -39,7 +39,7 @@ import com.google.common.base.Preconditions; public class ThrottledInputStream extends InputStream { private final InputStream rawStream; - private final long maxBytesPerSec; + private final float maxBytesPerSec; private final long startTime = System.currentTimeMillis(); private long bytesRead = 0; @@ -51,8 +51,8 @@ public class ThrottledInputStream extends InputStream { this(rawStream, Long.MAX_VALUE); } - public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { - assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; + public ThrottledInputStream(InputStream rawStream, float maxBytesPerSec) { + assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; this.rawStream = rawStream; this.maxBytesPerSec = maxBytesPerSec; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/04fdf610/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index b9d9ada..616872b 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -32,6 +32,8 @@ import java.util.NoSuchElementException; public class TestOptionsParser { + private static final float DELTA = 0.001f; + @Test public void testParseIgnoreFailure() { DistCpOptions options = OptionsParser.parse(new String[] { @@ -104,14 +106,14 @@ public class TestOptionsParser { DistCpOptions options = OptionsParser.parse(new String[] { "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); - Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB); + Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA); options = OptionsParser.parse(new String[] { "-bandwidth", - "11", + "11.2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); - Assert.assertEquals(options.getMapBandwidth(), 11); + Assert.assertEquals(options.getMapBandwidth(), 11.2, DELTA); } @Test(expected=IllegalArgumentException.class) @@ -585,8 +587,8 @@ public class TestOptionsParser { options.appendToConf(conf); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false)); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false)); - Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), - DistCpConstants.DEFAULT_BANDWIDTH_MB); + Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), + DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA); conf = new Configuration(); Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); @@ -597,14 +599,14 @@ public class TestOptionsParser { "-delete", "-pu", "-bandwidth", - "11", + "11.2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); options.appendToConf(conf); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false)); Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U"); - Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11); + Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11.2, DELTA); } @Test