Repository: sqoop Updated Branches: refs/heads/sqoop2 9df8a53dd -> 452791676
SQOOP-1882: JobManager currently ignores the TO connector IDF and assumed all IDF use String for the generic T (Veena Basavaraj via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/45279167 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/45279167 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/45279167 Branch: refs/heads/sqoop2 Commit: 4527916766bfadf10654d0c694db470ef4641724 Parents: 9df8a53 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sun Dec 14 12:39:41 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sun Dec 14 12:39:41 2014 -0800 ---------------------------------------------------------------------- .../idf/CSVIntermediateDataFormat.java | 26 ------------- .../connector/idf/IntermediateDataFormat.java | 6 +-- .../idf/TestCSVIntermediateDataFormat.java | 2 +- .../org/apache/sqoop/driver/JobManager.java | 35 +++++++++-------- .../org/apache/sqoop/driver/JobRequest.java | 25 ++++++++---- .../mapreduce/MapreduceExecutionEngine.java | 7 +++- .../org/apache/sqoop/job/MRJobConstants.java | 7 +++- .../org/apache/sqoop/job/io/SqoopWritable.java | 23 ++++++----- .../org/apache/sqoop/job/mr/SqoopMapper.java | 40 ++++++++++---------- .../job/mr/SqoopOutputFormatLoadExecutor.java | 25 ++++++------ .../org/apache/sqoop/job/TestMapReduce.java | 11 ++++-- .../java/org/apache/sqoop/job/TestMatching.java | 4 +- .../mr/TestSqoopOutputFormatLoadExecutor.java | 10 ++--- 13 files changed, 113 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index dbe193d..275321a 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -23,24 +23,16 @@ import static org.apache.sqoop.connector.common.SqoopIDFUtils.*; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.AbstractComplexListType; import org.apache.sqoop.schema.type.Column; import org.apache.sqoop.schema.type.ColumnType; -import org.apache.sqoop.schema.type.FixedPoint; -import org.apache.sqoop.schema.type.FloatingPoint; import org.apache.sqoop.utils.ClassUtils; import org.joda.time.DateTime; import org.joda.time.LocalDate; import org.joda.time.LocalDateTime; import org.joda.time.LocalTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; import org.json.simple.JSONValue; -import org.json.simple.parser.JSONParser; import java.io.DataInput; import java.io.DataOutput; @@ -319,20 +311,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { return data.equals(((CSVIntermediateDataFormat) other).data); } - public int compareTo(IntermediateDataFormat<?> o) { - if (this == o) { - return 0; - } - if (this.equals(o)) { - return 0; - } - if (!(o instanceof CSVIntermediateDataFormat)) { - throw new IllegalStateException("Expected Data to be instance of " - + "CSVIntermediateFormat, but was an instance of " + o.getClass().getName()); - } - return data.compareTo(o.getCSVTextData()); - } - /** * Encode to the sqoop prescribed CSV String for every element in the objet array * @param objectArray @@ -392,10 +370,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { } } - public String toString() { - return data; - } - /** * {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java index 93698a8..b8c8042 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java @@ -63,10 +63,10 @@ public abstract class IntermediateDataFormat<T> { * Set one row of data. If validate is set to true, the data is validated * against the schema. * - * @param data - A single row of data to be moved. + * @param obj - A single row of data to be moved. */ - public void setData(T data) { - this.data = data; + public void setData(T obj) { + this.data = obj; } /** * Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 83a95ec..f6852a0 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -51,7 +51,7 @@ import org.junit.Test; public class TestCSVIntermediateDataFormat { - private IntermediateDataFormat<?> dataFormat; + private CSVIntermediateDataFormat dataFormat; @Before public void setUp() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 01073d4..ff263ae 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -332,7 +332,6 @@ public class JobManager implements Reconfigurable { .instantiate(Driver.getInstance().getDriverJobConfigurationClass()); ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig); - // Create a job request for submit/execution JobRequest jobRequest = executionEngine.createJobRequest(); // Save important variables to the job request @@ -350,19 +349,23 @@ public class JobManager implements Reconfigurable { jobRequest.setJobName(job.getName()); jobRequest.setJobId(job.getPersistenceId()); jobRequest.setNotificationUrl(notificationBaseUrl + jobId); - Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector - .getIntermediateDataFormat(); - jobRequest.setIntermediateDataFormat(dataFormatClass); + jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat(), Direction.FROM); + jobRequest.setIntermediateDataFormat(toConnector.getIntermediateDataFormat(), Direction.TO); jobRequest.setFrom(fromConnector.getFrom()); jobRequest.setTo(toConnector.getTo()); // set all the jars addStandardJars(jobRequest); - addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass); + addConnectorClass(jobRequest, fromConnector); + addConnectorClass(jobRequest, toConnector); + addConnectorIDFClass(jobRequest, fromConnector.getIntermediateDataFormat()); + addConnectorIDFClass(jobRequest, toConnector.getIntermediateDataFormat()); + addConnectorInitializerJars(jobRequest, Direction.FROM); addConnectorInitializerJars(jobRequest, Direction.TO); - addIDFJars(jobRequest); + addIDFDependentJars(jobRequest, Direction.FROM); + addIDFDependentJars(jobRequest, Direction.TO); // call the intialize method initializeConnector(jobRequest, Direction.FROM); @@ -375,11 +378,12 @@ public class JobManager implements Reconfigurable { return jobRequest; } - private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector, - SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) { - jobRequest.addJarForClass(fromConnector.getClass()); - jobRequest.addJarForClass(toConnector.getClass()); - jobRequest.addJarForClass(dataFormatClass); + private void addConnectorClass(final JobRequest jobRequest, final SqoopConnector connector) { + jobRequest.addJarForClass(connector.getClass()); + } + + private void addConnectorIDFClass(final JobRequest jobRequest, Class<? extends IntermediateDataFormat<?>> idfClass) { + jobRequest.addJarForClass(idfClass); } private void addStandardJars(JobRequest jobRequest) { @@ -455,11 +459,10 @@ public class JobManager implements Reconfigurable { jobRequest.getJobConfig(direction)); } - @SuppressWarnings({ "unchecked", "rawtypes" }) - // TODO:SQOOP-1882 , should add the FROM and TO connector IDF jars - private void addIDFJars(JobRequest jobRequest) { - Class<? extends IntermediateDataFormat> idfClass = jobRequest.getIntermediateDataFormat(); - IntermediateDataFormat idf = (IntermediateDataFormat) ClassUtils.instantiate(idfClass); + @SuppressWarnings("unchecked") + private void addIDFDependentJars(JobRequest jobRequest, Direction direction) { + Class<? extends IntermediateDataFormat<?>> idfClass = jobRequest.getIntermediateDataFormat(direction); + IntermediateDataFormat<?> idf = ((IntermediateDataFormat<?>) ClassUtils.instantiate(idfClass)); jobRequest.addJars(idf.getJars()); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/core/src/main/java/org/apache/sqoop/driver/JobRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java index 8c1cc95..c9377a7 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java @@ -27,8 +27,10 @@ import org.apache.sqoop.job.etl.Transferable; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.utils.ClassUtils; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Set; /** * Submission details class is used when creating new submission and contains @@ -111,9 +113,14 @@ public class JobRequest { Integer loaders; /** - * The intermediate data format this submission should use. + * The intermediate data format this submission should use to read/extract. */ - Class<? extends IntermediateDataFormat> intermediateDataFormat; + Class<? extends IntermediateDataFormat<?>> fromIDF; + + /** + * The intermediate data format this submission should use to write/load. + */ + Class<? extends IntermediateDataFormat<?>> toIDF; public JobRequest() { this.jars = new LinkedList<String>(); @@ -191,7 +198,7 @@ public class JobRequest { } } - public void addJarForClass(Class klass) { + public void addJarForClass(Class<?> klass) { addJar(ClassUtils.jarForClass(klass)); } @@ -318,12 +325,16 @@ public class JobRequest { this.loaders = loaders; } - public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() { - return intermediateDataFormat; + public Class<? extends IntermediateDataFormat<?>> getIntermediateDataFormat(Direction direction) { + return direction.equals(Direction.FROM) ? fromIDF : toIDF; } - public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) { - this.intermediateDataFormat = intermediateDataFormat; + public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat<? extends Object>> intermediateDataFormat, Direction direction) { + if (direction.equals(Direction.FROM)) { + fromIDF = intermediateDataFormat; + } else if (direction.equals(Direction.TO)) { + toIDF = intermediateDataFormat; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 9b3eb44..3f79325 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -18,6 +18,7 @@ package org.apache.sqoop.execution.mapreduce; import org.apache.hadoop.io.NullWritable; +import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.driver.ExecutionEngine; import org.apache.sqoop.driver.JobRequest; @@ -69,8 +70,10 @@ public class MapreduceExecutionEngine extends ExecutionEngine { context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName()); context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName()); context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName()); - context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT, - mrJobRequest.getIntermediateDataFormat().getName()); + context.setString(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, + mrJobRequest.getIntermediateDataFormat(Direction.FROM).getName()); + context.setString(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, + mrJobRequest.getIntermediateDataFormat(Direction.TO).getName()); if(mrJobRequest.getExtractors() != null) { context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java index 67021a8..b7aa8c6 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java @@ -72,8 +72,11 @@ public final class MRJobConstants extends Constants { public static final String HADOOP_COMPRESS_CODEC = "mapred.output.compression.codec"; - public static final String INTERMEDIATE_DATA_FORMAT = - DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format"; + public static final String FROM_INTERMEDIATE_DATA_FORMAT = + DriverConstants.PREFIX_EXECUTION_CONFIG + "from.intermediate.format"; + + public static final String TO_INTERMEDIATE_DATA_FORMAT = + DriverConstants.PREFIX_EXECUTION_CONFIG + "to.intermediate.format"; private MRJobConstants() { // Disable explicit object creation http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java index 336ab97..ed182cb 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java @@ -21,7 +21,6 @@ package org.apache.sqoop.job.io; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; - import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.utils.ClassUtils; @@ -30,8 +29,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +/** + * Writable used to load the data to the {@link #Transferable} entity TO + */ + public class SqoopWritable implements Configurable, WritableComparable<SqoopWritable> { - private IntermediateDataFormat<?> dataFormat; + private IntermediateDataFormat<?> toIDF; private Configuration conf; public SqoopWritable() { @@ -39,22 +42,22 @@ public class SqoopWritable implements Configurable, WritableComparable<SqoopWrit } public SqoopWritable(IntermediateDataFormat<?> dataFormat) { - this.dataFormat = dataFormat; + this.toIDF = dataFormat; } public void setString(String data) { - this.dataFormat.setCSVTextData(data); + this.toIDF.setCSVTextData(data); } - public String getString() { return dataFormat.getCSVTextData(); } + public String getString() { return toIDF.getCSVTextData(); } @Override public void write(DataOutput out) throws IOException { - out.writeUTF(dataFormat.getCSVTextData()); + out.writeUTF(toIDF.getCSVTextData()); } @Override - public void readFields(DataInput in) throws IOException { dataFormat.setCSVTextData(in.readUTF()); } + public void readFields(DataInput in) throws IOException { toIDF.setCSVTextData(in.readUTF()); } @Override public int compareTo(SqoopWritable o) { return getString().compareTo(o.getString()); } @@ -68,9 +71,9 @@ public class SqoopWritable implements Configurable, WritableComparable<SqoopWrit public void setConf(Configuration conf) { this.conf = conf; - if (dataFormat == null) { - String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT); - this.dataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(intermediateDataFormatName); + if (toIDF == null) { + String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT); + this.toIDF = (IntermediateDataFormat<?>) ClassUtils.instantiate(toIDFClass); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 7434243..dee0011 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -56,8 +56,8 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, * Service for reporting progress to mapreduce. */ private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); - private IntermediateDataFormat<String> fromDataFormat = null; - private IntermediateDataFormat<String> toDataFormat = null; + private IntermediateDataFormat<Object> fromIDF = null; + private IntermediateDataFormat<Object> toIDF = null; private Matcher matcher; @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -72,11 +72,12 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf); matcher = MatcherFactory.getMatcher(fromSchema, toSchema); - String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT); - fromDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName); - fromDataFormat.setSchema(matcher.getFromSchema()); - toDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName); - toDataFormat.setSchema(matcher.getToSchema()); + String fromIDFClass = conf.get(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT); + fromIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(fromIDFClass); + fromIDF.setSchema(matcher.getFromSchema()); + String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT); + toIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(toIDFClass); + toIDF.setSchema(matcher.getToSchema()); // Objects that should be passed to the Executor execution PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); @@ -107,45 +108,46 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, } // There are two IDF objects we carry around in memory during the sqoop job execution. - // The fromDataFormat has the fromSchema in it, the toDataFormat has the toSchema in it. - // Before we do the writing to the toDatFormat object we do the matching process to negotiate between - // the two schemas and their corresponding column types before we write the data to the toDataFormat object + // The fromIDF has the fromSchema in it, the toIDF has the toSchema in it. + // Before we do the writing to the toIDF object we do the matching process to negotiate between + // the two schemas and their corresponding column types before we write the data to the toIDF object private class SqoopMapDataWriter extends DataWriter { private Context context; private SqoopWritable writable; public SqoopMapDataWriter(Context context) { this.context = context; - this.writable = new SqoopWritable(toDataFormat); + this.writable = new SqoopWritable(toIDF); } @Override public void writeArrayRecord(Object[] array) { - fromDataFormat.setObjectData(array); + fromIDF.setObjectData(array); writeContent(); } @Override public void writeStringRecord(String text) { - fromDataFormat.setCSVTextData(text); + fromIDF.setCSVTextData(text); writeContent(); } @Override public void writeRecord(Object obj) { - fromDataFormat.setData(obj.toString()); + fromIDF.setData(obj); writeContent(); } private void writeContent() { try { if (LOG.isDebugEnabled()) { - LOG.debug("Extracted data: " + fromDataFormat.getCSVTextData()); + LOG.debug("Extracted data: " + fromIDF.getCSVTextData()); } - // NOTE: The fromDataFormat and the corresponding fromSchema is used only for the matching process - // The output of the mappers is finally written to the toDataFormat object after the matching process - // since the writable encapsulates the toDataFormat ==> new SqoopWritable(toDataFormat) - toDataFormat.setObjectData(matcher.getMatchingData(fromDataFormat.getObjectData())); + // NOTE: The fromIDF and the corresponding fromSchema is used only for the matching process + // The output of the mappers is finally written to the toIDF object after the matching process + // since the writable encapsulates the toIDF ==> new SqoopWritable(toIDF) + toIDF.setObjectData(matcher.getMatchingData(fromIDF.getObjectData())); + // NOTE: We do not use the reducer to do the writing (a.k.a LOAD in ETL). Hence the mapper sets up the writable context.write(writable, NullWritable.get()); } catch (Exception e) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e); http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index d664337..aaf771c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.concurrent.*; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -32,7 +33,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.matcher.Matcher; import org.apache.sqoop.connector.matcher.MatcherFactory; @@ -53,7 +53,7 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean readerFinished = false; private volatile boolean writerFinished = false; - private volatile IntermediateDataFormat<String> dataFormat; + private volatile IntermediateDataFormat<? extends Object> toDataFormat; private Matcher matcher; private JobContext context; private SqoopRecordWriter writer; @@ -63,11 +63,11 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean isTest = false; private String loaderName; - // NOTE: This method is only exposed for test cases and hence assumes CSVIntermediateDataFormat - SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ + // NOTE: This method is only exposed for test cases + SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName, IntermediateDataFormat<?> idf) { this.isTest = isTest; this.loaderName = loaderName; - dataFormat = new CSVIntermediateDataFormat(); + toDataFormat = idf; writer = new SqoopRecordWriter(); matcher = null; } @@ -78,10 +78,10 @@ public class SqoopOutputFormatLoadExecutor { matcher = MatcherFactory.getMatcher( MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()), MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); - dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context - .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT)); + toDataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(context + .getConfiguration().get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT)); // Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat - dataFormat.setSchema(matcher.getToSchema()); + toDataFormat.setSchema(matcher.getToSchema()); } public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { @@ -102,7 +102,7 @@ public class SqoopOutputFormatLoadExecutor { free.acquire(); checkIfConsumerThrew(); // NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat - dataFormat.setCSVTextData(key.getString()); + toDataFormat.setCSVTextData(key.getString()); filled.release(); } @@ -158,7 +158,7 @@ public class SqoopOutputFormatLoadExecutor { return null; } try { - return dataFormat.getObjectData(); + return toDataFormat.getObjectData(); } finally { releaseSema(); } @@ -172,7 +172,7 @@ public class SqoopOutputFormatLoadExecutor { return null; } try { - return dataFormat.getCSVTextData(); + return toDataFormat.getCSVTextData(); } finally { releaseSema(); } @@ -185,7 +185,7 @@ public class SqoopOutputFormatLoadExecutor { return null; } try { - return dataFormat.getData(); + return toDataFormat.getData(); } catch (Throwable t) { readerFinished = true; LOG.error("Caught exception e while getting content ", t); @@ -215,6 +215,7 @@ public class SqoopOutputFormatLoadExecutor { private class ConsumerThread implements Runnable { + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void run() { LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 256c34d..47696cc 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -67,7 +67,9 @@ public class TestMapReduce { public void testSqoopInputFormat() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + Job job = new Job(conf); SqoopInputFormat inputformat = new SqoopInputFormat(); @@ -86,7 +88,9 @@ public class TestMapReduce { Configuration conf = new Configuration(); conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + Job job = new Job(conf); // from and to have the same schema in this test case MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema()); @@ -106,7 +110,8 @@ public class TestMapReduce { conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); - conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); // from and to have the same schema in this test case http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index 67c8525..1692ddb 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -128,8 +128,8 @@ public class TestMatching { Configuration conf = new Configuration(); conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, - CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index d897125..ec0e886 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -121,7 +121,7 @@ public class TestSqoopOutputFormatLoadExecutor { @Before public void setUp() { conf = new Configuration(); - conf.setIfUnset(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); + conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); } @@ -129,7 +129,7 @@ public class TestSqoopOutputFormatLoadExecutor { public void testWhenLoaderThrows() throws Throwable { conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); + SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName(), new CSVIntermediateDataFormat()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat); @@ -147,7 +147,7 @@ public class TestSqoopOutputFormatLoadExecutor { public void testSuccessfulContinuousLoader() throws Throwable { conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); + SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName(), new CSVIntermediateDataFormat()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat); @@ -168,7 +168,7 @@ public class TestSqoopOutputFormatLoadExecutor { @Test (expected = SqoopException.class) public void testSuccessfulLoader() throws Throwable { SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); + SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName(), new CSVIntermediateDataFormat()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat); @@ -192,7 +192,7 @@ public class TestSqoopOutputFormatLoadExecutor { public void testThrowingContinuousLoader() throws Throwable { conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); + SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName(), new CSVIntermediateDataFormat()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); SqoopWritable writable = new SqoopWritable(dataFormat);
