SQOOP-1428: Sqoop2: From/To: Rebase against Sqoop2 branch for SQOOP-777
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cf448a22 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cf448a22 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cf448a22 Branch: refs/heads/SQOOP-1367 Commit: cf448a22916dba988ef30d56f6b7d9c9c7269a51 Parents: 4283e8e Author: Abraham Elmahrek <[email protected]> Authored: Mon Aug 11 11:45:40 2014 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Mon Aug 11 15:13:24 2014 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/framework/JobManager.java | 4 ++-- .../mapreduce/MapreduceExecutionEngine.java | 4 +++- .../apache/sqoop/job/mr/ConfigurationUtils.java | 20 +++++++------------- .../org/apache/sqoop/job/mr/SqoopMapper.java | 10 +++++----- .../job/mr/SqoopOutputFormatLoadExecutor.java | 2 +- .../sqoop/job/mr/TestConfigurationUtils.java | 4 ++-- .../mapreduce/MapreduceSubmissionEngine.java | 3 ++- 7 files changed, 22 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index e0bf011..d7d8962 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -351,8 +351,8 @@ public class JobManager implements Reconfigurable { request.setJobId(job.getPersistenceId()); request.setNotificationUrl(notificationBaseUrl + jobId); Class<? extends IntermediateDataFormat<?>> dataFormatClass = - connector.getIntermediateDataFormat(); - request.setIntermediateDataFormat(connector.getIntermediateDataFormat()); + fromConnector.getIntermediateDataFormat(); + request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); // Create request object // Let's register all important jars http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 82b195a..ff328cb 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -45,7 +45,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine { return new MRSubmissionRequest(); } - public void prepareSubmission(MRSubmissionRequest request) { + public void prepareSubmission(SubmissionRequest gRequest) { + MRSubmissionRequest request = (MRSubmissionRequest)gRequest; + // Add jar dependencies addDependencies(request); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java index c60ae68..476689a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java @@ -235,21 +235,15 @@ public final class ConfigurationUtils { * @param job MapReduce Job object * @param schema Schema */ - public static void setFromConnectorSchema(Job job, Schema schema) { + public static void setConnectorSchema(ConnectorType type, Job job, Schema schema) { if(schema != null) { - job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); - } - } + switch (type) { + case FROM: + job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); - /** - * Persist To Connector generated schema. - * - * @param job MapReduce Job object - * @param schema Schema - */ - public static void setToConnectorSchema(Job job, Schema schema) { - if(schema != null) { - job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); + case TO: + job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 2daaee3..c3b6ae9 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -65,10 +65,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); + // Propagate connector schema in every case for now + // TODO: Change to coditional choosing between Connector schemas. + Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf); + String intermediateDataFormatName = conf.get(JobConstants .INTERMEDIATE_DATA_FORMAT); data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName); - data.setSchema(ConfigurationUtils.getConnectorSchema(conf)); + data.setSchema(schema); dataOut = new SqoopWritable(); // Objects that should be pass to the Executor execution @@ -76,10 +80,6 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, Object configConnection = null; Object configJob = null; - // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between Connector schemas. - Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf); - // Get configs for extractor subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 123737e..bed99a2 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -73,7 +73,7 @@ public class SqoopOutputFormatLoadExecutor { producer = new SqoopRecordWriter(); data = (IntermediateDataFormat) ClassUtils.instantiate(context .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); - data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration())); + data.setSchema(ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, context.getConfiguration())); } public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java index 7e434b7..09f5695 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java @@ -96,13 +96,13 @@ public class TestConfigurationUtils { // // @Test // public void testConnectorSchema() throws Exception { -// ConfigurationUtils.setFromConnectorSchema(job, getSchema("a")); +// ConfigurationUtils.setConnectorSchema(job, getSchema("a")); // assertEquals(getSchema("a"), ConfigurationUtils.getFromConnectorSchema(jobConf)); // } // // @Test // public void testConnectorSchemaNull() throws Exception { -// ConfigurationUtils.setFromConnectorSchema(job, null); +// ConfigurationUtils.setConnectorSchema(job, null); // assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf)); // } // http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 3c21421..fd423cb 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -207,7 +207,8 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM)); ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO)); ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob()); - ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema()); + // @TODO(Abe): Persist TO schema. + ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema()); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());
