Updated Branches: refs/heads/sqoop2 fdfc18c83 -> 30a1af0eb
SQOOP-684 Encode type of the job into executed map reduce job (Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/30a1af0e Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/30a1af0e Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/30a1af0e Branch: refs/heads/sqoop2 Commit: 30a1af0ebff33d505d72956f5400e79254915e93 Parents: c47a153 Author: Bilung Lee <[email protected]> Authored: Fri Nov 9 10:11:57 2012 -0800 Committer: Bilung Lee <[email protected]> Committed: Fri Nov 9 10:11:57 2012 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/sqoop/core/CoreError.java | 7 ++- .../apache/sqoop/framework/ExecutionEngine.java | 16 +---- .../apache/sqoop/framework/FrameworkManager.java | 16 +++-- .../apache/sqoop/framework/SubmissionRequest.java | 51 +++++++++++---- .../execution/mapreduce/MRSubmissionRequest.java | 12 +--- .../mapreduce/MapreduceExecutionEngine.java | 12 +--- .../java/org/apache/sqoop/job/JobConstants.java | 1 + .../apache/sqoop/job/mr/ConfigurationUtils.java | 5 ++ .../java/org/apache/sqoop/job/mr/SqoopMapper.java | 25 ++++++- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 4 + .../java/org/apache/sqoop/job/TestMapReduce.java | 3 + .../mapreduce/MapreduceSubmissionEngine.java | 9 +-- 12 files changed, 100 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/core/CoreError.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java b/core/src/main/java/org/apache/sqoop/core/CoreError.java index 29c0809..08034ed 100644 --- a/core/src/main/java/org/apache/sqoop/core/CoreError.java +++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java @@ -93,7 +93,12 @@ public enum CoreError implements ErrorCode { CORE_0021("Error occurs during partitioner run"), /** Unable to parse because it is not properly delimited */ - CORE_0022("Unable to parse because it is not properly delimited"); + CORE_0022("Unable to parse because it is not properly delimited"), + + /** Unknown job type */ + CORE_0023("Unknown job type"), + + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java index e1ccdf6..ae14d9a 100644 --- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java +++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java @@ -45,22 +45,10 @@ public abstract class ExecutionEngine { * Return new SubmissionRequest class or any subclass if it's needed by * execution and submission engine combination. * - * @param summary Submission summary - * @param connector Appropriate connector structure - * @param connectorConnection Connector connection configuration - * @param connectorJob Connector job configuration - * @param frameworkConnection Framework connection configuration - * @param frameworkJob Framework job configuration * @return New Submission request object */ - public SubmissionRequest createSubmissionRequest(MSubmission summary, - SqoopConnector connector, - Object connectorConnection, - Object connectorJob, - Object frameworkConnection, - Object frameworkJob) { - return new SubmissionRequest(summary, connector, - connectorConnection, connectorJob, frameworkConnection, frameworkJob); + public SubmissionRequest createSubmissionRequest() { + return new SubmissionRequest(); } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 575a8bb..d04a100 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -307,10 +307,16 @@ public final class FrameworkManager { // Create request object MSubmission summary = new MSubmission(jobId); - SubmissionRequest request = executionEngine.createSubmissionRequest( - summary, connector, - connectorConnection, connectorJob, - frameworkConnection, frameworkJob); + SubmissionRequest request = executionEngine.createSubmissionRequest(); + + // Save important variables to the submission request + request.setSummary(summary); + request.setConnector(connector); + request.setConfigConnectorConnection(connectorConnection); + request.setConfigConnectorJob(connectorJob); + request.setConfigFrameworkConnection(frameworkConnection); + request.setConfigFrameworkJob(frameworkJob); + request.setJobType(job.getType()); request.setJobName(job.getName()); request.setJobId(job.getPersistenceId()); @@ -329,6 +335,7 @@ public final class FrameworkManager { // Extra libraries that Sqoop code requires request.addJarForClass(JSONValue.class); + // Get connector callbacks switch (job.getType()) { case IMPORT: request.setConnectorCallbacks(connector.getImporter()); @@ -340,7 +347,6 @@ public final class FrameworkManager { throw new SqoopException(FrameworkError.FRAMEWORK_0005, "Unsupported job type " + job.getType().name()); } - LOG.debug("Using callbacks: " + request.getConnectorCallbacks()); // Initialize submission from connector perspective http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index 9f471b5..8392a10 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -20,6 +20,7 @@ package org.apache.sqoop.framework; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.job.etl.CallbackBase; +import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.utils.ClassUtils; @@ -49,6 +50,11 @@ public class SubmissionRequest { long jobId; /** + * Job type + */ + MJob.Type jobType; + + /** * Connector instance associated with this submission request */ SqoopConnector connector; @@ -87,27 +93,20 @@ public class SubmissionRequest { String outputDirectory; - public SubmissionRequest(MSubmission submission, - SqoopConnector connector, - Object configConnectorConnection, - Object configConnectorJob, - Object configFrameworkConnection, - Object configFrameworkJob) { - this.summary = submission; - this.connector = connector; + public SubmissionRequest() { this.jars = new LinkedList<String>(); this.connectorContext = new MutableMapContext(); this.frameworkContext = new MutableMapContext(); - this.configConnectorConnection = configConnectorConnection; - this.configConnectorJob = configConnectorJob; - this.configFrameworkConnection = configFrameworkConnection; - this.configFrameworkJob = configFrameworkJob; } public MSubmission getSummary() { return summary; } + public void setSummary(MSubmission summary) { + this.summary = summary; + } + public String getJobName() { return jobName; } @@ -124,10 +123,22 @@ public class SubmissionRequest { this.jobId = jobId; } + public MJob.Type getJobType() { + return jobType; + } + + public void setJobType(MJob.Type jobType) { + this.jobType = jobType; + } + public SqoopConnector getConnector() { return connector; } + public void setConnector(SqoopConnector connector) { + this.connector = connector; + } + public List<String> getJars() { return jars; } @@ -156,18 +167,34 @@ public class SubmissionRequest { return configConnectorConnection; } + public void setConfigConnectorConnection(Object config) { + configConnectorConnection = config; + } + public Object getConfigConnectorJob() { return configConnectorJob; } + public void setConfigConnectorJob(Object config) { + configConnectorJob = config; + } + public Object getConfigFrameworkConnection() { return configFrameworkConnection; } + public void setConfigFrameworkConnection(Object config) { + configFrameworkConnection = config; + } + public Object getConfigFrameworkJob() { return configFrameworkJob; } + public void setConfigFrameworkJob(Object config) { + configFrameworkJob = config; + } + public MutableMapContext getConnectorContext() { return connectorContext; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java index 3f37222..32d598c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java @@ -21,9 +21,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.framework.SubmissionRequest; -import org.apache.sqoop.model.MSubmission; /** * Map-reduce specific submission request containing all extra information @@ -42,14 +40,8 @@ public class MRSubmissionRequest extends SubmissionRequest { Class<? extends Writable> outputKeyClass; Class<? extends Writable> outputValueClass; - public MRSubmissionRequest(MSubmission submission, - SqoopConnector connector, - Object configConnectorConnection, - Object configConnectorJob, - Object configFrameworkConnection, - Object configFrameworkJob) { - super(submission, connector, configConnectorConnection, configConnectorJob, - configFrameworkConnection, configFrameworkJob); + public MRSubmissionRequest() { + super(); } public Class<? extends InputFormat> getInputFormatClass() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 77ca59b..4a5b305 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -19,7 +19,6 @@ package org.apache.sqoop.execution.mapreduce; import org.apache.hadoop.io.NullWritable; import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.framework.ExecutionEngine; import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.job.JobConstants; @@ -29,7 +28,6 @@ import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; -import org.apache.sqoop.model.MSubmission; /** * @@ -37,14 +35,8 @@ import org.apache.sqoop.model.MSubmission; public class MapreduceExecutionEngine extends ExecutionEngine { @Override - public SubmissionRequest createSubmissionRequest(MSubmission summary, - SqoopConnector connector, - Object connectorConnection, - Object connectorJob, - Object frameworkConnection, - Object frameworkJob) { - return new MRSubmissionRequest(summary, connector, connectorConnection, - connectorJob, frameworkConnection, frameworkJob); + public SubmissionRequest createSubmissionRequest() { + return new MRSubmissionRequest(); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index 19ac91e..58b2a42 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -27,6 +27,7 @@ public final class JobConstants extends Constants { public static final String PREFIX_JOB_CONFIG = ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job."; + public static final String JOB_TYPE = PREFIX_JOB_CONFIG + "type"; public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG + "etl.partitioner"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java index ae647ce..4aa2128 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java @@ -20,6 +20,7 @@ package org.apache.sqoop.job.mr; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.model.FormUtils; +import org.apache.sqoop.model.MJob; import org.apache.sqoop.utils.ClassUtils; /** @@ -27,6 +28,10 @@ import org.apache.sqoop.utils.ClassUtils; */ public final class ConfigurationUtils { + public static MJob.Type getJobType(Configuration configuration) { + return MJob.Type.valueOf(configuration.get(JobConstants.JOB_TYPE)); + } + public static Object getConnectorConnection(Configuration configuration) { return loadConfiguration(configuration, JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 6892b4b..dbe832a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -49,14 +49,31 @@ public class SqoopMapper String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); - PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); - Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf); - Object connectorJob = ConfigurationUtils.getConnectorJob(conf); + // Objects that should be pass to the Executor execution + PrefixContext subContext = null; + Object configConnection = null; + Object configJob = null; + + // Executor is in connector space for IMPORT and in framework space for EXPORT + switch (ConfigurationUtils.getJobType(conf)) { + case IMPORT: + subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); + configConnection = ConfigurationUtils.getConnectorConnection(conf); + configJob = ConfigurationUtils.getConnectorJob(conf); + break; + case EXPORT: + subContext = new PrefixContext(conf, ""); + configConnection = ConfigurationUtils.getFrameworkConnection(conf); + configJob = ConfigurationUtils.getFrameworkJob(conf); + break; + default: + throw new SqoopException(CoreError.CORE_0023); + } SqoopSplit split = context.getCurrentKey(); try { - extractor.run(connectorContext, connectorConnection, connectorJob, split.getPartition(), + extractor.run(subContext, configConnection, configJob, split.getPartition(), new MapDataWriter(context)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 812dd8e..875a123 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -63,6 +63,7 @@ public class TestHdfsLoad extends TestCase { FileUtils.delete(outdir); Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); @@ -80,6 +81,7 @@ public class TestHdfsLoad extends TestCase { FileUtils.delete(outdir); Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); @@ -122,6 +124,7 @@ public class TestHdfsLoad extends TestCase { FileUtils.delete(outdir); Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); @@ -139,6 +142,7 @@ public class TestHdfsLoad extends TestCase { FileUtils.delete(outdir); Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index e269899..6e49cc2 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -55,6 +55,7 @@ public class TestMapReduce extends TestCase { public void testInputFormat() throws Exception { Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); Job job = Job.getInstance(conf); @@ -71,6 +72,7 @@ public class TestMapReduce extends TestCase { public void testMapper() throws Exception { Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); @@ -80,6 +82,7 @@ public class TestMapReduce extends TestCase { public void testOutputFormat() throws Exception { Configuration conf = new Configuration(); + conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 15cb476..48dc073 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -119,11 +119,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { */ @Override public boolean isExecutionEngineSupported(Class executionEngineClass) { - if(executionEngineClass == MapreduceExecutionEngine.class) { - return true; - } - - return false; + return executionEngineClass == MapreduceExecutionEngine.class; } /** @@ -137,6 +133,9 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { // Clone global configuration Configuration configuration = new Configuration(globalConfiguration); + // Serialize job type as it will be needed by underlying execution engine + configuration.set(JobConstants.JOB_TYPE, request.getJobType().name()); + // Serialize framework context into job configuration for(Map.Entry<String, String> entry: request.getFrameworkContext()) { configuration.set(entry.getKey(), entry.getValue());
