Updated Branches: refs/heads/sqoop2 c4f9ef846 -> a633fb0b3
SQOOP-737 Option to set number of extractors and loaders (Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a633fb0b Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a633fb0b Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a633fb0b Branch: refs/heads/sqoop2 Commit: a633fb0b34dece59e9f1420985817a8da8f8cb24 Parents: c4f9ef8 Author: Bilung Lee <[email protected]> Authored: Tue Dec 4 11:50:21 2012 -0800 Committer: Bilung Lee <[email protected]> Committed: Tue Dec 4 12:28:23 2012 -0800 ---------------------------------------------------------------------- .../org/apache/sqoop/common/MutableContext.java | 15 +++++++ .../org/apache/sqoop/common/MutableMapContext.java | 10 +++++ .../jdbc/GenericJdbcImportPartitioner.java | 7 +-- .../connector/jdbc/TestImportPartitioner.java | 16 ++----- .../apache/sqoop/framework/FrameworkManager.java | 6 +++ .../apache/sqoop/framework/SubmissionRequest.java | 26 ++++++++++++ .../configuration/ImportJobConfiguration.java | 2 + .../framework/configuration/ThrottlingForm.java | 32 +++++++++++++++ .../main/resources/framework-resources.properties | 11 +++++ .../mapreduce/MapreduceExecutionEngine.java | 4 ++ .../java/org/apache/sqoop/job/JobConstants.java | 3 + .../apache/sqoop/job/MapreduceExecutionError.java | 3 + .../org/apache/sqoop/job/mr/SqoopInputFormat.java | 11 +++++- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 2 +- .../java/org/apache/sqoop/job/TestMapReduce.java | 2 +- .../java/org/apache/sqoop/job/etl/Partitioner.java | 1 + .../mapreduce/MapreduceSubmissionEngine.java | 9 +++- 17 files changed, 140 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/common/src/main/java/org/apache/sqoop/common/MutableContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/common/MutableContext.java b/common/src/main/java/org/apache/sqoop/common/MutableContext.java index 238bbfd..ecb97b5 100644 --- a/common/src/main/java/org/apache/sqoop/common/MutableContext.java +++ b/common/src/main/java/org/apache/sqoop/common/MutableContext.java @@ -30,4 +30,19 @@ public interface MutableContext extends ImmutableContext { */ public void setString(String key, String value); + /** + * Set long value for given key. + * + * @param key Key + * @param value New value + */ + public void setLong(String key, long value); + + /** + * Set integer value for given key. + * + * @param key Key + * @param value New value + */ + public void setInteger(String key, int value); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java b/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java index cd9d3e3..cb0c3e1 100644 --- a/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java +++ b/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java @@ -43,6 +43,16 @@ public class MutableMapContext extends MapContext implements Iterable<Map.Entry< } @Override + public void setLong(String key, long value) { + getOptions().put(key, Long.toString(value)); + } + + @Override + public void setInteger(String key, int value) { + getOptions().put(key, Integer.toString(value)); + } + + @Override public Iterator<Map.Entry<String, String>> iterator() { return getOptions().entrySet().iterator(); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java index a6d3b52..0d9f0c0 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java @@ -23,21 +23,20 @@ import java.util.List; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; public class GenericJdbcImportPartitioner extends Partitioner { - private int numberPartitions; + private long numberPartitions; private String partitionColumnName; private int partitionColumnType; private String partitionMinValue; private String partitionMaxValue; @Override - public List<Partition> getPartitions(ImmutableContext context, Object connectionC, Object jobC) { - numberPartitions = context.getInt(Constants.JOB_ETL_NUMBER_PARTITIONS, 10); + public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object connectionC, Object jobC) { + numberPartitions = maxPartitions; partitionColumnName = context.getString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME); partitionColumnType = Integer.parseInt(context.getString( http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java index d5db190..77c4739 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java @@ -18,7 +18,6 @@ package org.apache.sqoop.connector.jdbc; import java.sql.Types; -import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,7 +30,6 @@ import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; -import org.junit.Test; public class TestImportPartitioner extends TestCase { @@ -52,13 +50,12 @@ public class TestImportPartitioner extends TestCase { context.setString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); - context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5"); ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); Partitioner partitioner = new GenericJdbcImportPartitioner(); - List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf); + List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf); verifyResult(partitions, new String[] { "-5 <= ICOL AND ICOL < -3", @@ -83,13 +80,12 @@ public class TestImportPartitioner extends TestCase { context.setString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); - context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3"); ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); Partitioner partitioner = new GenericJdbcImportPartitioner(); - List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf); + List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf); verifyResult(partitions, new String[] { "-5 <= ICOL AND ICOL < -1", @@ -112,13 +108,12 @@ public class TestImportPartitioner extends TestCase { context.setString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1)); - context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "13"); ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); Partitioner partitioner = new GenericJdbcImportPartitioner(); - List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf); + List<Partition> partitions = partitioner.getPartitions(context, 13, connConf, jobConf); verifyResult(partitions, new String[] { "-5 <= ICOL AND ICOL < -4", @@ -148,13 +143,12 @@ public class TestImportPartitioner extends TestCase { context.setString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf((double)(START + NUMBER_OF_ROWS - 1))); - context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5"); ConnectionConfiguration connConf = new ConnectionConfiguration(); ImportJobConfiguration jobConf = new ImportJobConfiguration(); Partitioner partitioner = new GenericJdbcImportPartitioner(); - List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf); + List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf); verifyResult(partitions, new String[] { "-5.0 <= DCOL AND DCOL < -3.0", @@ -185,7 +179,7 @@ public class TestImportPartitioner extends TestCase { ImportJobConfiguration jobConf = new ImportJobConfiguration(); Partitioner partitioner = new GenericJdbcImportPartitioner(); - List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf); + List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf); verifyResult(partitions, new String[] { "-5.0 <= DCOL AND DCOL < -1.6666666666666665", http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 0cd6969..6674643 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -440,6 +440,12 @@ public final class FrameworkManager { // Initialize the map-reduce part (all sort of required classes, ...) request.setOutputDirectory(jobConfiguration.output.outputDirectory); + // We're directly moving configured number of extractors and loaders to + // underlying request object. In the future we might need to throttle this + // count based on other running jobs to meet our SLAs. + request.setExtractors(jobConfiguration.throttling.extractors); + request.setLoaders(jobConfiguration.throttling.loaders); + // Delegate rest of the job to execution engine executionEngine.prepareImportSubmission(request); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index fb6b6a9..53d0039 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -97,6 +97,16 @@ public class SubmissionRequest { */ String notificationUrl; + /** + * Number of extractors + */ + Integer extractors; + + /** + * Number of loaders + */ + Integer loaders; + public SubmissionRequest() { this.jars = new LinkedList<String>(); this.connectorContext = new MutableMapContext(); @@ -222,4 +232,20 @@ public class SubmissionRequest { public void setNotificationUrl(String url) { this.notificationUrl = url; } + + public Integer getExtractors() { + return extractors; + } + + public void setExtractors(Integer extractors) { + this.extractors = extractors; + } + + public Integer getLoaders() { + return loaders; + } + + public void setLoaders(Integer loaders) { + this.loaders = loaders; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java index 8c4dcf1..c674fc2 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java @@ -27,4 +27,6 @@ import org.apache.sqoop.model.Form; public class ImportJobConfiguration { @Form public OutputForm output; + + @Form public ThrottlingForm throttling; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java new file mode 100644 index 0000000..c435f6b --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.framework.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * Form to set up number of loaders and extractors + */ +@FormClass +public class ThrottlingForm { + + @Input public Integer extractors; + + @Input public Integer loaders; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/resources/framework-resources.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties index 019f5ca..db40946 100644 --- a/core/src/main/resources/framework-resources.properties +++ b/core/src/main/resources/framework-resources.properties @@ -44,3 +44,14 @@ output.outputDirectory.help = Output directory for final data output.ignored.label = Ignored output.ignored.help = This value is ignored +# Throttling From +# +throttling.label = Throttling resources +throttling.help = Set throttling boundaries to not overload your systems + +throttling.extractors.label = Extractors +throttling.extractors.help = Number of extractors that Sqoop will use + +throttling.loaders.label = Loaders +throttling.loaders.help = Number of loaders that Sqoop will use + http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 3248e77..e2163ff 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -68,6 +68,10 @@ public class MapreduceExecutionEngine extends ExecutionEngine { context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName()); context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName()); + if(request.getExtractors() != null) { + context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); + } + // TODO: This settings should be abstracted to core module at some point if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index d899fce..f5123a2 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -49,6 +49,9 @@ public final class JobConstants extends Constants { + "mr.output.codec"; + public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG + + "etl.extractor.count"; + public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION = PREFIX_JOB_CONFIG + "config.class.connector.connection"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java index 30956f3..1dc12d1 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java @@ -76,6 +76,9 @@ public enum MapreduceExecutionError implements ErrorCode { /** Unsupported output format type found **/ MAPRED_EXEC_0024("Unknown output format type"), + /** Got invalid number of partitions from Partitioner */ + MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 8fcdc99..d191e03 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -30,7 +30,9 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; @@ -62,7 +64,9 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf); Object connectorJob = ConfigurationUtils.getConnectorJob(conf); - List<Partition> partitions = partitioner.getPartitions(connectorContext, connectorConnection, connectorJob); + long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10); + + List<Partition> partitions = partitioner.getPartitions(connectorContext, maxPartitions, connectorConnection, connectorJob); List<InputSplit> splits = new LinkedList<InputSplit>(); for (Partition partition : partitions) { LOG.debug("Partition: " + partition); @@ -71,6 +75,11 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { splits.add(split); } + if(splits.size() > maxPartitions) { + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0025, + String.format("Got %d, max was %d", splits.size(), maxPartitions)); + } + return splits; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 21a2be9..4e6209d 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -204,7 +204,7 @@ public class TestHdfsLoad extends TestCase { public static class DummyPartitioner extends Partitioner { @Override - public List<Partition> getPartitions(ImmutableContext context, Object oc, Object oj) { + public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object oc, Object oj) { List<Partition> partitions = new LinkedList<Partition>(); for (int id = START_ID; id <= NUMBER_OF_IDS; id++) { DummyPartition partition = new DummyPartition(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 745a3a4..c8caecd 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -120,7 +120,7 @@ public class TestMapReduce extends TestCase { public static class DummyPartitioner extends Partitioner { @Override - public List<Partition> getPartitions(ImmutableContext context, Object oc, Object oj) { + public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object oc, Object oj) { List<Partition> partitions = new LinkedList<Partition>(); for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) { DummyPartition partition = new DummyPartition(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java index 3a525c4..9cd000c 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java @@ -28,6 +28,7 @@ import java.util.List; public abstract class Partitioner { public abstract List<Partition> getPartitions(ImmutableContext context, + long maxPartitions, Object connectionConfiguration, Object jobConfiguration); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index a64a477..8f7864e 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -208,8 +208,13 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { FileOutputFormat.setOutputPath(job, new Path(outputDirectory)); } - // TODO(jarcec): Harcoded no reducers - job.setNumReduceTasks(0); + // Set number of reducers as number of configured loaders or suppress + // reduce phase entirely if loaders are not set at all. + if(request.getLoaders() != null) { + job.setNumReduceTasks(request.getLoaders()); + } else { + job.setNumReduceTasks(0); + } job.setOutputFormatClass(request.getOutputFormatClass()); job.setOutputKeyClass(request.getOutputKeyClass());
