Author: szetszwo
Date: Tue May 6 10:25:10 2014
New Revision: 1592704
URL: http://svn.apache.org/r1592704
Log:
svn merge -c 1592703 from trunk for MAPREDUCE-5402. In DynamicInputFormat,
change MAX_CHUNKS_TOLERABLE, MAX_CHUNKS_IDEAL, MIN_RECORDS_PER_CHUNK and
SPLIT_RATIO to be configurable.
Modified:
hadoop/common/branches/branch-2/ (props changed)
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
Propchange: hadoop/common/branches/branch-2/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk:r1592703
Modified:
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java?rev=1592704&r1=1592703&r2=1592704&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
Tue May 6 10:25:10 2014
@@ -51,7 +51,16 @@ public class DistCpConstants {
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
public static final String CONF_LABEL_BANDWIDTH_MB =
"distcp.map.bandwidth.mb";
-
+
+ public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
+ "distcp.dynamic.max.chunks.tolerable";
+ public static final String CONF_LABEL_MAX_CHUNKS_IDEAL =
+ "distcp.dynamic.max.chunks.ideal";
+ public static final String CONF_LABEL_MIN_RECORDS_PER_CHUNK =
+ "distcp.dynamic.min.records_per_chunk";
+ public static final String CONF_LABEL_SPLIT_RATIO =
+ "distcp.dynamic.split.ratio";
+
/* Total bytes to be copied. Updated by copylisting. Unfiltered count */
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED =
"mapred.total.bytes.expected";
@@ -107,4 +116,13 @@ public class DistCpConstants {
public static final int INVALID_ARGUMENT = -1;
public static final int DUPLICATE_INPUT = -2;
public static final int UNKNOWN_ERROR = -999;
+
+ /**
+ * Constants for DistCp default values of configurable values
+ */
+ public static final int MAX_CHUNKS_TOLERABLE_DEFAULT = 400;
+ public static final int MAX_CHUNKS_IDEAL_DEFAULT = 100;
+ public static final int MIN_RECORDS_PER_CHUNK_DEFAULT = 5;
+ public static final int SPLIT_RATIO_DEFAULT = 2;
+
}
Modified:
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java?rev=1592704&r1=1592703&r2=1592704&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
Tue May 6 10:25:10 2014
@@ -57,7 +57,7 @@ public class DynamicInputFormat<K, V> ex
= "mapred.num.splits";
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
= "mapred.num.entries.per.chunk";
-
+
/**
* Implementation of InputFormat::getSplits(). This method splits up the
* copy-listing file into chunks, and assigns the first batch to different
@@ -91,7 +91,7 @@ public class DynamicInputFormat<K, V> ex
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
- MIN_RECORDS_PER_CHUNK,
+ getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
@@ -107,9 +107,11 @@ public class DynamicInputFormat<K, V> ex
final Configuration configuration = context.getConfiguration();
int numRecords = getNumberOfRecords(configuration);
int numMaps = getNumMapTasks(configuration);
+ int maxChunksTolerable = getMaxChunksTolerable(configuration);
+
// Number of chunks each map will process, on average.
int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
- validateNumChunksUsing(splitRatio, numMaps);
+ validateNumChunksUsing(splitRatio, numMaps, maxChunksTolerable);
int numEntriesPerChunk = (int)Math.ceil((float)numRecords
/(splitRatio * numMaps));
@@ -168,9 +170,9 @@ public class DynamicInputFormat<K, V> ex
return chunksFinal;
}
- private static void validateNumChunksUsing(int splitRatio, int numMaps)
- throws IOException {
- if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
+ private static void validateNumChunksUsing(int splitRatio, int numMaps,
+ int maxChunksTolerable) throws IOException {
+ if (splitRatio * numMaps > maxChunksTolerable)
throw new IOException("Too many chunks created with splitRatio:"
+ splitRatio + ", numMaps:" + numMaps
+ ". Reduce numMaps or decrease split-ratio to proceed.");
@@ -238,14 +240,61 @@ public class DynamicInputFormat<K, V> ex
int numMaps, int numPaths) {
return configuration.getInt(
CONF_LABEL_LISTING_SPLIT_RATIO,
- getSplitRatio(numMaps, numPaths));
+ getSplitRatio(numMaps, numPaths, configuration));
+ }
+
+ private static int getMaxChunksTolerable(Configuration conf) {
+ int maxChunksTolerable = conf.getInt(
+ DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE,
+ DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
+ if (maxChunksTolerable <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
+ maxChunksTolerable = DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT;
+ }
+ return maxChunksTolerable;
+ }
+
+ private static int getMaxChunksIdeal(Configuration conf) {
+ int maxChunksIdeal = conf.getInt(
+ DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL,
+ DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
+ if (maxChunksIdeal <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
+ maxChunksIdeal = DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT;
+ }
+ return maxChunksIdeal;
+ }
+
+ private static int getMinRecordsPerChunk(Configuration conf) {
+ int minRecordsPerChunk = conf.getInt(
+ DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK,
+ DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
+ if (minRecordsPerChunk <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
+ minRecordsPerChunk = DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT;
+ }
+ return minRecordsPerChunk;
}
- private static final int MAX_CHUNKS_TOLERABLE = 400;
- private static final int MAX_CHUNKS_IDEAL = 100;
- private static final int MIN_RECORDS_PER_CHUNK = 5;
- private static final int SPLIT_RATIO_DEFAULT = 2;
-
+ private static int getSplitRatio(Configuration conf) {
+ int splitRatio = conf.getInt(
+ DistCpConstants.CONF_LABEL_SPLIT_RATIO,
+ DistCpConstants.SPLIT_RATIO_DEFAULT);
+ if (splitRatio <= 0) {
+ LOG.warn(DistCpConstants.CONF_LABEL_SPLIT_RATIO +
+ " should be positive. Fall back to default value: "
+ + DistCpConstants.SPLIT_RATIO_DEFAULT);
+ splitRatio = DistCpConstants.SPLIT_RATIO_DEFAULT;
+ }
+ return splitRatio;
+ }
+
/**
* Package private, for testability.
* @param nMaps The number of maps requested for.
@@ -253,19 +302,34 @@ public class DynamicInputFormat<K, V> ex
* @return The number of splits each map should handle, ideally.
*/
static int getSplitRatio(int nMaps, int nRecords) {
+ return getSplitRatio(nMaps, nRecords,new Configuration());
+ }
+
+ /**
+ * Package private, for testability.
+ * @param nMaps The number of maps requested for.
+ * @param nRecords The number of records to be copied.
+ * @param conf The configuration set by users.
+ * @return The number of splits each map should handle, ideally.
+ */
+ static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
+ int maxChunksIdeal = getMaxChunksIdeal(conf);
+ int minRecordsPerChunk = getMinRecordsPerChunk(conf);
+ int splitRatio = getSplitRatio(conf);
+
if (nMaps == 1) {
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
return 1;
}
- if (nMaps > MAX_CHUNKS_IDEAL)
- return SPLIT_RATIO_DEFAULT;
+ if (nMaps > maxChunksIdeal)
+ return splitRatio;
- int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
+ int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
- return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
- SPLIT_RATIO_DEFAULT : nPickups;
+ return nRecordsPerChunk < minRecordsPerChunk ?
+ splitRatio : nPickups;
}
static int getNumEntriesPerChunk(Configuration configuration) {
Modified:
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java?rev=1592704&r1=1592703&r2=1592704&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
Tue May 6 10:25:10 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.tools.mapred.lib;
+import org.apache.hadoop.tools.DistCpConstants;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -160,5 +161,25 @@ public class TestDynamicInputFormat {
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
+
+ // Tests with negative value configuration
+ Configuration conf = new Configuration();
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, -1);
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, -1);
+ conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, -1);
+ conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, -1);
+ Assert.assertEquals(1,
+ DynamicInputFormat.getSplitRatio(1, 1000000000, conf));
+ Assert.assertEquals(2,
+ DynamicInputFormat.getSplitRatio(11000000, 10, conf));
+ Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700, conf));
+ Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200, conf));
+
+ // Tests with valid configuration
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, 100);
+ conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, 30);
+ conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, 10);
+ conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53);
+ Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf));
}
}