Repository: sqoop Updated Branches: refs/heads/trunk e247f76bf -> cfe503744
SQOOP-1411: The number of tasks is not set properly in PGBulkloadExportManager (Masatake Iwasaki via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cfe50374 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cfe50374 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cfe50374 Branch: refs/heads/trunk Commit: cfe503744b885defa0998462b4210bee12dec518 Parents: e247f76 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sun Aug 10 12:48:09 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sun Aug 10 12:48:09 2014 -0700 ---------------------------------------------------------------------- .../apache/sqoop/mapreduce/ExportJobBase.java | 4 ++-- .../org/apache/sqoop/mapreduce/JobBase.java | 22 +++++++++++++++++--- .../postgresql/PGBulkloadExportJob.java | 14 ++++--------- .../manager/PGBulkloadManagerManualTest.java | 6 ++++++ 4 files changed, 31 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java index 9f510b9..54c27ee 100644 --- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java @@ -257,8 +257,8 @@ public class ExportJobBase extends JobBase { } @Override - protected int configureNumTasks(Job job) throws IOException { - int numMaps = super.configureNumTasks(job); + protected int configureNumMapTasks(Job job) throws IOException { + int numMaps = super.configureNumMapTasks(job); job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numMaps); return numMaps; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/JobBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java index 92a78ac..032d408 100644 --- a/src/java/org/apache/sqoop/mapreduce/JobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java @@ -285,20 +285,36 @@ public class JobBase { } /** - * Configure the number of map/reduce tasks to use in the job. + * Configure the number of map/reduce tasks to use in the job, + * returning the number of map tasks for backward compatibility. */ protected int configureNumTasks(Job job) throws IOException { + int numMapTasks = configureNumMapTasks(job); + configureNumReduceTasks(job); + return numMapTasks; + } + + /** + * Configure the number of map tasks to use in the job. + */ + protected int configureNumMapTasks(Job job) throws IOException { int numMapTasks = options.getNumMappers(); if (numMapTasks < 1) { numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); } - ConfigurationHelper.setJobNumMaps(job, numMapTasks); - job.setNumReduceTasks(0); return numMapTasks; } + /** + * Configure the number of reduce tasks to use in the job. + */ + protected int configureNumReduceTasks(Job job) throws IOException { + job.setNumReduceTasks(0); + return 0; + } + /** Set the main job that will be run. */ protected void setJob(Job job) { mrJob = job; http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java index 1e2ad9f..32fe077 100644 --- a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java @@ -139,7 +139,6 @@ public class PGBulkloadExportJob extends ExportJobBase { conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.setInt("mapred.map.max.attempts", 1); conf.setInt("mapred.reduce.max.attempts", 1); - conf.setIfUnset("mapred.reduce.tasks", "1"); if (context.getOptions().doClearStagingTable()) { conf.setBoolean("pgbulkload.clear.staging.table", true); } @@ -189,16 +188,11 @@ public class PGBulkloadExportJob extends ExportJobBase { @Override - protected int configureNumTasks(Job job) throws IOException { - SqoopOptions options = context.getOptions(); - int numMapTasks = options.getNumMappers(); - if (numMapTasks < 1) { - numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; - LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); + protected int configureNumReduceTasks(Job job) throws IOException { + if (job.getNumReduceTasks() < 1) { + job.setNumReduceTasks(1); } - - ConfigurationHelper.setJobNumMaps(job, numMapTasks); - return numMapTasks; + return job.getNumReduceTasks(); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java index 4d03a8b..fc5fd6d 100644 --- a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java @@ -192,6 +192,12 @@ public class PGBulkloadManagerManualTest extends TestExport { } + public void testMultiReduceExportWithNewProp() throws IOException, SQLException { + String[] genericargs = newStrArray(null, "-Dmapreduce.job.reduces=2"); + multiFileTestWithGenericArgs(2, 10, 2, genericargs); + } + + public void testExportWithTablespace() throws IOException, SQLException { String[] genericargs = newStrArray(null, "-Dpgbulkload.staging.tablespace=" + TABLESPACE);
