Repository: sqoop Updated Branches: refs/heads/trunk 1ffda71a2 -> e21529ac6
SQOOP-2334: Sqoop Volume Per Mapper (Rakesh Sharma via Venkat Ranganathan) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e21529ac Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e21529ac Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e21529ac Branch: refs/heads/trunk Commit: e21529ac6aad03bdcb572c61420e258be2d823fe Parents: 1ffda71 Author: Venkat Ranganathan <[email protected]> Authored: Mon Jun 29 20:32:31 2015 -0700 Committer: Venkat Ranganathan <[email protected]> Committed: Mon Jun 29 20:33:13 2015 -0700 ---------------------------------------------------------------------- src/docs/man/import-args.txt | 4 +++ src/docs/user/import.txt | 14 ++++++++ src/java/org/apache/sqoop/SqoopOptions.java | 9 +++++ .../sqoop/config/ConfigurationConstants.java | 5 +++ .../sqoop/config/ConfigurationHelper.java | 19 ++++++++++ .../sqoop/mapreduce/DataDrivenImportJob.java | 5 +++ .../mapreduce/db/DataDrivenDBInputFormat.java | 33 +++++++++++++++-- .../apache/sqoop/mapreduce/db/DateSplitter.java | 9 ++++- .../sqoop/mapreduce/db/IntegerSplitter.java | 27 ++++++++++++-- .../org/apache/sqoop/tool/BaseSqoopTool.java | 1 + src/java/org/apache/sqoop/tool/ImportTool.java | 12 +++++++ .../sqoop/mapreduce/db/TestIntegerSplitter.java | 38 ++++++++++++++++---- 12 files changed, 164 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/docs/man/import-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/man/import-args.txt b/src/docs/man/import-args.txt index 93f65ba..49855ce 100644 --- a/src/docs/man/import-args.txt +++ b/src/docs/man/import-args.txt @@ -42,6 +42,10 @@ include::import-common-args.txt[] --split-by (column-name):: Column of the table used to split the table for parallel import +--split-limit (size):: + Upper Limit for each split size. Optimize Integer and Date columns. + For date or timestamp fields it is calculated in seconds. + --table (table-name):: The table to import http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/docs/user/import.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index df04157..342633a 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -73,6 +73,9 @@ Argument Description +\--split-by <column-name>+ Column of the table used to split work\ units. Cannot be used with\ +--autoreset-to-one-mapper+ option. ++\--split-limit <n>+ Upper Limit for each split size.\ + This only applies to Integer and Date columns.\ + For date or timestamp fields it is calculated in seconds. +\--autoreset-to-one-mapper+ Import should use one mapper if a table\ has no primary key and no split-by column\ is provided. Cannot be used with\ @@ -211,6 +214,17 @@ multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column. +User can override the +\--num-mapers+ by using +\--split-limit+ option. +Using the +\--split-limit+ parameter places a limit on the size of the split +section created. If the size of the split created is larger than the size +specified in this parameter, then the splits would be resized to fit within +this limit, and the number of splits will change according to that.This +affects actual number of mappers. If size of a split calculated based on +provided +\--num-mappers+ parameter exceeds +\--split-limit+ parameter then actual +number of mappers will be increased.If the value specified in +\--split-limit+ +parameter is 0 or negative, the parameter will be ignored altogether and +the split size will be calculated according to the number of mappers. + If a table does not have a primary key defined and the +--split-by <col>+ is not provided, then import will fail unless the number of mappers is explicitly set to one with the +--num-mappers 1+ option http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index d7c9cbb..9405605 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -138,6 +138,7 @@ public class SqoopOptions implements Cloneable { @StoredAsProperty("codegen.auto.compile.dir") private boolean jarDirIsAuto; private String hadoopMapRedHome; // not serialized to metastore. @StoredAsProperty("db.split.column") private String splitByCol; + @StoredAsProperty("split.limit") private Integer splitLimit; @StoredAsProperty("db.where.clause") private String whereClause; @StoredAsProperty("db.query") private String sqlQuery; @StoredAsProperty("db.query.boundary") private String boundaryQuery; @@ -1178,6 +1179,14 @@ public class SqoopOptions implements Cloneable { this.splitByCol = splitBy; } + public Integer getSplitLimit() { + return splitLimit; + } + + public void setSplitLimit(Integer splitLimit) { + this.splitLimit = splitLimit; + } + public String getWhereClause() { return whereClause; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/config/ConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java index 2070b63..e19c17b 100644 --- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java +++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java @@ -95,6 +95,11 @@ public final class ConfigurationConstants { */ public static final String MAPRED_DISTCACHE_CONF_PARAM = "tmpjars"; + /** + * The Configuration property identifying the split size. + */ + public static final String PROP_SPLIT_LIMIT = "split.limit"; + private ConfigurationConstants() { // Disable Explicit Object Creation } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/config/ConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/config/ConfigurationHelper.java b/src/java/org/apache/sqoop/config/ConfigurationHelper.java index 8dc2061..298907d 100644 --- a/src/java/org/apache/sqoop/config/ConfigurationHelper.java +++ b/src/java/org/apache/sqoop/config/ConfigurationHelper.java @@ -213,6 +213,25 @@ public final class ConfigurationHelper { return ret; } + /** + * Stores in configuration the size of single hadoop input split. + * + * @param config Configuration to store the split size. + * @param splitLimit The size of single hadoop input split. + */ + public static void setSplitLimit(Configuration config, long splitLimit) { + config.setLong(ConfigurationConstants.PROP_SPLIT_LIMIT, splitLimit); + } + + /** + * Retrieves the size of single hadoop input split. + * + * @param config Configuration to retrieve the split size. + * @return Split size. + */ + public static long getSplitLimit(Configuration config) { + return config.getInt(ConfigurationConstants.PROP_SPLIT_LIMIT, -1); + } public static boolean isLocalJobTracker(Configuration conf) { // If framework is set to YARN, then we can't be running in local mode if ("yarn".equalsIgnoreCase(conf http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 7521464..388ce7d 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -320,6 +320,11 @@ public class DataDrivenImportJob extends ImportJobBase { job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY, options.getInlineLobLimit()); + if (options.getSplitLimit() != null) { + org.apache.sqoop.config.ConfigurationHelper.setSplitLimit( + job.getConfiguration(), options.getSplitLimit()); + } + LOG.debug("Using InputFormat: " + inputFormatClass); job.setInputFormatClass(inputFormatClass); } finally { http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java index 2c59fe5..db96e41 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java @@ -77,13 +77,27 @@ public class DataDrivenDBInputFormat<T extends DBWritable> * into InputSplits. */ protected DBSplitter getSplitter(int sqlDataType) { + return getSplitter(sqlDataType, 0); + } + + /** + * @return the DBSplitter implementation to use to divide the table/query + * into InputSplits. + */ + protected DBSplitter getSplitter(int sqlDataType, long splitLimit) { switch (sqlDataType) { case Types.NUMERIC: case Types.DECIMAL: + if(splitLimit >= 0) { + throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns"); + } return new BigDecimalSplitter(); case Types.BIT: case Types.BOOLEAN: + if(splitLimit >= 0) { + throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns"); + } return new BooleanSplitter(); case Types.INTEGER: @@ -95,15 +109,24 @@ public class DataDrivenDBInputFormat<T extends DBWritable> case Types.REAL: case Types.FLOAT: case Types.DOUBLE: + if(splitLimit >= 0) { + throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns"); + } return new FloatSplitter(); case Types.NVARCHAR: case Types.NCHAR: + if(splitLimit >= 0) { + throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns"); + } return new NTextSplitter(); case Types.CHAR: case Types.VARCHAR: case Types.LONGVARCHAR: + if(splitLimit >= 0) { + throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns"); + } return new TextSplitter(); case Types.DATE: @@ -114,6 +137,9 @@ public class DataDrivenDBInputFormat<T extends DBWritable> default: // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, // BLOB, ARRAY, STRUCT, REF, DATALINK, and JAVA_OBJECT. + if(splitLimit >= 0) { + throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns"); + } return null; } } @@ -125,12 +151,15 @@ public class DataDrivenDBInputFormat<T extends DBWritable> int targetNumTasks = ConfigurationHelper.getJobNumMaps(job); String boundaryQuery = getDBConf().getInputBoundingQuery(); + long splitLimit = org.apache.sqoop.config.ConfigurationHelper + .getSplitLimit(job.getConfiguration()); // If user do not forced us to use his boundary query and we don't have to // bacause there is only one mapper we will return single split that // separates nothing. This can be considerably more optimal for a large // table with no index. if (1 == targetNumTasks - && (boundaryQuery == null || boundaryQuery.isEmpty())) { + && (boundaryQuery == null || boundaryQuery.isEmpty()) + && splitLimit <= 0) { List<InputSplit> singletonSplit = new ArrayList<InputSplit>(); singletonSplit.add(new com.cloudera.sqoop.mapreduce.db. DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1")); @@ -160,7 +189,7 @@ public class DataDrivenDBInputFormat<T extends DBWritable> sqlDataType = Types.BIGINT; } - DBSplitter splitter = getSplitter(sqlDataType); + DBSplitter splitter = getSplitter(sqlDataType, splitLimit); if (null == splitter) { throw new IOException("Sqoop does not have the splitter for the given" + " SQL data type. Please use either different split column (argument" http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java b/src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java index 31e9351..9b94283 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java @@ -42,6 +42,10 @@ public class DateSplitter extends IntegerSplitter { private static final Log LOG = LogFactory.getLog(DateSplitter.class); + //Factor to convert the value to milliseconds. + //For Split limit we take input as seconds. So we need to convert to milliseconds + private static final long MS_IN_SEC = 1000L; + public List<InputSplit> split(Configuration conf, ResultSet results, String colName) throws SQLException { @@ -69,8 +73,11 @@ public class DateSplitter extends IntegerSplitter { return splits; } + // For split size we are using seconds. So we need to convert to milliseconds. + long splitLimit = org.apache.sqoop.config.ConfigurationHelper.getSplitLimit(conf) * MS_IN_SEC; + // Gather the split point integers - List<Long> splitPoints = split(numSplits, minVal, maxVal); + List<Long> splitPoints = split(numSplits,splitLimit, minVal, maxVal); List<InputSplit> splits = new ArrayList<InputSplit>(); // Turn the split points into a set of intervals. http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java b/src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java index e6fefc6..5f8f937 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java +++ b/src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java @@ -60,8 +60,10 @@ public class IntegerSplitter implements DBSplitter { return splits; } + long splitLimit = org.apache.sqoop.config.ConfigurationHelper.getSplitLimit(conf); + // Get all the split points together. - List<Long> splitPoints = split(numSplits, minVal, maxVal); + List<Long> splitPoints = split(numSplits,splitLimit, minVal, maxVal); if (LOG.isDebugEnabled()) { LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts", minVal, maxVal, numSplits)); @@ -112,8 +114,15 @@ public class IntegerSplitter implements DBSplitter { * [5, 8) * [8, 12) * [12, 18] note the closed interval for the last split. + * + * @param numSplits Number of split chunks. + * @param splitLimit Limit the split size. + * @param minVal Minimum value of the set to split. + * @param maxVal Maximum value of the set to split. + * @return Split values inside the set. + * @throws SQLException In case of SQL exception. */ - public List<Long> split(long numSplits, long minVal, long maxVal) + public List<Long> split(long numSplits,long splitLimit, long minVal, long maxVal) throws SQLException { List<Long> splits = new ArrayList<Long>(); @@ -124,6 +133,20 @@ public class IntegerSplitter implements DBSplitter { // and add 1 if the current split index is less than the < the remainder. // This is guaranteed to add up to remainder and not surpass the value. long splitSize = (maxVal - minVal) / numSplits; + double splitSizeDouble = ((double)maxVal - (double)minVal) / (double)numSplits; + + if (splitLimit > 0 && splitSizeDouble > splitLimit) { + // If split size is greater than limit then do the same thing with larger + // amount of splits. + LOG.debug("Adjusting split size " + splitSize + + " because it's greater than limit " + splitLimit); + long newSplits = (maxVal - minVal) / splitLimit; + return split(newSplits != numSplits ? newSplits : newSplits + 1, + splitLimit, minVal, maxVal); + } + LOG.info("Split size: " + splitSize + "; Num splits: " + numSplits + + " from: " + minVal + " to: " + maxVal); + long remainder = (maxVal - minVal) % numSplits; long curVal = minVal; http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index c97bb58..4e2e66d 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -82,6 +82,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String CLEAR_STAGING_TABLE_ARG = "clear-staging-table"; public static final String COLUMNS_ARG = "columns"; public static final String SPLIT_BY_ARG = "split-by"; + public static final String SPLIT_LIMIT_ARG = "split-limit"; public static final String WHERE_ARG = "where"; public static final String HADOOP_HOME_ARG = "hadoop-home"; public static final String HADOOP_MAPRED_HOME_ARG = "hadoop-mapred-home"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/java/org/apache/sqoop/tool/ImportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index c79e044..39af42c 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -661,6 +661,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { .withDescription("Column of the table used to split work units") .withLongOpt(SPLIT_BY_ARG) .create()); + importOpts + .addOption(OptionBuilder + .withArgName("size") + .hasArg() + .withDescription( + "Upper Limit of rows per split for split columns of Date/Time/Timestamp and integer types. For date or timestamp fields it is calculated in seconds. split-limit should be greater than 0") + .withLongOpt(SPLIT_LIMIT_ARG) + .create()); importOpts.addOption(OptionBuilder.withArgName("where clause") .hasArg().withDescription("WHERE clause to use during import") .withLongOpt(WHERE_ARG) @@ -887,6 +895,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { out.setSplitByCol(in.getOptionValue(SPLIT_BY_ARG)); } + if (in.hasOption(SPLIT_LIMIT_ARG)) { + out.setSplitLimit(Integer.parseInt(in.getOptionValue(SPLIT_LIMIT_ARG))); + } + if (in.hasOption(WHERE_ARG)) { out.setWhereClause(in.getOptionValue(WHERE_ARG)); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/e21529ac/src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java b/src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java index 136afc7..e93b6ad 100644 --- a/src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java +++ b/src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java @@ -76,38 +76,38 @@ public class TestIntegerSplitter extends TestCase { } public void testEvenSplits() throws SQLException { - List<Long> splits = new IntegerSplitter().split(10, 0, 100); + List<Long> splits = new IntegerSplitter().split(10,-1, 0, 100); long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, }; assertLongArrayEquals(expected, toLongArray(splits)); } public void testOddSplits() throws SQLException { - List<Long> splits = new IntegerSplitter().split(10, 0, 95); + List<Long> splits = new IntegerSplitter().split(10,-1, 0, 95); long [] expected = { 0, 10, 20, 30, 40, 50, 59, 68, 77, 86, 95, }; assertLongArrayEquals(expected, toLongArray(splits)); } public void testSingletonSplit() throws SQLException { - List<Long> splits = new IntegerSplitter().split(1, 5, 5); + List<Long> splits = new IntegerSplitter().split(1,-1, 5, 5); long [] expected = { 5, 5 }; assertLongArrayEquals(expected, toLongArray(splits)); } public void testSingletonSplit2() throws SQLException { // Same test, but overly-high numSplits - List<Long> splits = new IntegerSplitter().split(5, 5, 5); + List<Long> splits = new IntegerSplitter().split(5,-1, 5, 5); long [] expected = { 5, 5 }; assertLongArrayEquals(expected, toLongArray(splits)); } public void testTooManySplits() throws SQLException { - List<Long> splits = new IntegerSplitter().split(5, 3, 5); + List<Long> splits = new IntegerSplitter().split(5,-1, 3, 5); long [] expected = { 3, 4, 5, 5}; assertLongArrayEquals(expected, toLongArray(splits)); } public void testExactSplitsAsInterval() throws SQLException { - List<Long> splits = new IntegerSplitter().split(5, 1, 5); + List<Long> splits = new IntegerSplitter().split(5,-1, 1, 5); long [] expected = { 1, 2, 3, 4, 5, 5}; assertLongArrayEquals(expected, toLongArray(splits)); } @@ -119,8 +119,32 @@ public class TestIntegerSplitter extends TestCase { * @throws SQLException */ public void testBigIntSplits() throws SQLException { - List<Long> splits = new IntegerSplitter().split(4, 14, + List<Long> splits = new IntegerSplitter().split(4,-1, 14, 7863696997872966707L); assertEquals(splits.size(), 5); } + + public void testEvenSplitsWithLimit() throws SQLException { + List<Long> splits = new IntegerSplitter().split(5, 10, 0, 100); + long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testOddSplitsWithLimit() throws SQLException { + List<Long> splits = new IntegerSplitter().split(5, 10, 0, 95); + long [] expected = { 0, 10, 20, 30, 40, 50, 59, 68, 77, 86, 95}; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testSplitWithBiggerLimit() throws SQLException { + List<Long> splits = new IntegerSplitter().split(10, 15, 0, 100); + long [] expected = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testFractionalSplitWithLimit() throws SQLException { + List<Long> splits = new IntegerSplitter().split(5, 1, 1, 10); + long [] expected = {1,2, 3, 4, 5, 6, 7, 8, 9, 10, 10}; + assertLongArrayEquals(expected, toLongArray(splits)); + } }
