Repository: sqoop Updated Branches: refs/heads/sqoop2 a2e87bef0 -> 90ec25b2a
SQOOP-1349: Sqoop2: Use configurable writable to get Intermediate Data Format (Veena Basavaraj via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/90ec25b2 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/90ec25b2 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/90ec25b2 Branch: refs/heads/sqoop2 Commit: 90ec25b2a8ac44b1f42ac7d9eb0a1557717a3629 Parents: a2e87be Author: Abraham Elmahrek <[email protected]> Authored: Thu Nov 13 10:08:19 2014 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Thu Nov 13 10:08:19 2014 -0800 ---------------------------------------------------------------------- .../org/apache/sqoop/job/io/FieldTypes.java | 42 ---------------- .../org/apache/sqoop/job/io/SqoopWritable.java | 50 ++++++++++++++------ .../apache/sqoop/job/mr/SqoopInputFormat.java | 10 ++-- .../org/apache/sqoop/job/mr/SqoopMapper.java | 14 ++++-- .../job/mr/SqoopOutputFormatLoadExecutor.java | 16 ++++--- .../apache/sqoop/job/io/TestSqoopWritable.java | 5 +- .../mr/TestSqoopOutputFormatLoadExecutor.java | 12 ++--- 7 files changed, 66 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java deleted file mode 100644 index e96dc6e..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.io; - -public final class FieldTypes { - - public static final int NULL = 0; - - public static final int BOOLEAN = 1; - - public static final int BYTE = 10; - public static final int CHAR = 11; - - public static final int SHORT = 20; - public static final int INT = 21; - public static final int LONG = 22; - - public static final int FLOAT = 50; - public static final int DOUBLE = 51; - - public static final int BIN = 100; - public static final int UTF = 101; - - private FieldTypes() { - // Disable explicit object creation - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java index ed118d2..05b731a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java @@ -18,42 +18,64 @@ */ package org.apache.sqoop.job.io; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.utils.ClassUtils; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -public class SqoopWritable implements WritableComparable<SqoopWritable> { - private String strData; +public class SqoopWritable implements Configurable, WritableComparable<SqoopWritable> { + private IntermediateDataFormat<?> dataFormat; + private Configuration conf; - public SqoopWritable() {} + public SqoopWritable() { + this(null); + } - public void setString(String data) { - strData = data; + public SqoopWritable(IntermediateDataFormat<?> dataFormat) { + this.dataFormat = dataFormat; } - public String getString() { - return strData; + public void setString(String data) { + this.dataFormat.setTextData(data); } + public String getString() { return dataFormat.getTextData(); } + @Override public void write(DataOutput out) throws IOException { - out.writeUTF(strData); + out.writeUTF(dataFormat.getTextData()); } @Override - public void readFields(DataInput in) throws IOException { - strData = in.readUTF(); - } + public void readFields(DataInput in) throws IOException { dataFormat.setTextData(in.readUTF()); } @Override - public int compareTo(SqoopWritable o) { - return strData.compareTo(o.getString()); - } + public int compareTo(SqoopWritable o) { return getString().compareTo(o.getString()); } @Override public String toString() { return getString(); } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + + if (dataFormat == null) { + String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT); + this.dataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(intermediateDataFormatName); + } + } + + @Override + public Configuration getConf() { + return conf; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 887b4bb..d20c903 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -30,10 +30,10 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.MRExecutionError; -import org.apache.sqoop.common.PrefixContext; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; @@ -63,14 +63,14 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object connectorConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); - Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); + Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); + Object connectorFromJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema); - List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob); + List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig); List<InputSplit> splits = new LinkedList<InputSplit>(); for (Partition partition : partitions) { LOG.debug("Partition: " + partition); http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index e25f404..664692a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -107,13 +107,17 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, } } + // There are two IDF objects we carry around in memory during the sqoop job execution. + // The fromDataFormat has the fromSchema in it, the toDataFormat has the toSchema in it. + // Before we do the writing to the toDatFormat object we do the matching process to negotiate between + // the two schemas and their corresponding column types before we write the data to the toDataFormat object private class SqoopMapDataWriter extends DataWriter { private Context context; private SqoopWritable writable; public SqoopMapDataWriter(Context context) { this.context = context; - this.writable = new SqoopWritable(); + this.writable = new SqoopWritable(toDataFormat); } @Override @@ -139,10 +143,10 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, if (LOG.isDebugEnabled()) { LOG.debug("Extracted data: " + fromDataFormat.getTextData()); } - - toDataFormat.setObjectData( matcher.getMatchingData( fromDataFormat.getObjectData() ) ); - - writable.setString(toDataFormat.getTextData()); + // NOTE: The fromDataFormat and the corresponding fromSchema is used only for the matching process + // The output of the mappers is finally written to the toDataFormat object after the matching process + // since the writable encapsulates the toDataFormat ==> new SqoopWritable(toDataFormat) + toDataFormat.setObjectData(matcher.getMatchingData(fromDataFormat.getObjectData())); context.write(writable, NullWritable.get()); } catch (Exception e) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e); http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 579101e..49a66b9 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -63,6 +63,7 @@ public class SqoopOutputFormatLoadExecutor { private volatile boolean isTest = false; private String loaderName; + // NOTE: This method is only exposed for test cases and hence assumes CSVIntermediateDataFormat SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ this.isTest = isTest; this.loaderName = loaderName; @@ -79,6 +80,7 @@ public class SqoopOutputFormatLoadExecutor { MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT)); + // Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat dataFormat.setSchema(matcher.getToSchema()); } @@ -99,6 +101,7 @@ public class SqoopOutputFormatLoadExecutor { public void write(SqoopWritable key, NullWritable value) throws InterruptedException { free.acquire(); checkIfConsumerThrew(); + // NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat dataFormat.setTextData(key.getString()); filled.release(); } @@ -227,24 +230,23 @@ public class SqoopOutputFormatLoadExecutor { // Objects that should be pass to the Executor execution PrefixContext subContext = null; - Object configConnection = null; - Object configJob = null; + Object connectorLinkConfig = null; + Object connectorToJobConfig = null; Schema schema = null; if (!isTest) { - // Using the TO schema since the IDF returns data in TO schema + // Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat schema = matcher.getToSchema(); - subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); - configConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf); - configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); + connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf); + connectorToJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); } // Create loader context LoaderContext loaderContext = new LoaderContext(subContext, reader, schema); LOG.info("Running loader class " + loaderName); - loader.load(loaderContext, configConnection, configJob); + loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig); LOG.info("Loader has finished"); } catch (Throwable t) { readerFinished = true; http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java index 3207e53..b07a076 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java @@ -27,12 +27,13 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; +import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.junit.Assert; import org.junit.Test; public class TestSqoopWritable { - private final SqoopWritable writable = new SqoopWritable(); + private final SqoopWritable writable = new SqoopWritable(new CSVIntermediateDataFormat()); @Test public void testStringInStringOut() { @@ -78,7 +79,7 @@ public class TestSqoopWritable { //Don't test what the data is, test that SqoopWritable can read it. InputStream instream = new ByteArrayInputStream(written); - SqoopWritable newWritable = new SqoopWritable(); + SqoopWritable newWritable = new SqoopWritable(new CSVIntermediateDataFormat()); DataInput in = new DataInputStream(instream); newWritable.readFields(in); Assert.assertEquals(testData, newWritable.getString()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index 67e965d..7c40ad5 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -132,11 +132,10 @@ public class TestSqoopOutputFormatLoadExecutor { SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); try { for (int count = 0; count < 100; count++) { dataFormat.setTextData(String.valueOf(count)); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); } } catch (SqoopException ex) { @@ -151,7 +150,7 @@ public class TestSqoopOutputFormatLoadExecutor { SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { @@ -161,7 +160,6 @@ public class TestSqoopOutputFormatLoadExecutor { } } dataFormat.setTextData(builder.toString()); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null); @@ -173,7 +171,7 @@ public class TestSqoopOutputFormatLoadExecutor { SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); StringBuilder builder = new StringBuilder(); for (int count = 0; count < 100; count++) { builder.append(String.valueOf(count)); @@ -182,7 +180,6 @@ public class TestSqoopOutputFormatLoadExecutor { } } dataFormat.setTextData(builder.toString()); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); //Allow writer to complete. @@ -198,7 +195,7 @@ public class TestSqoopOutputFormatLoadExecutor { SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter(); IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF(); - SqoopWritable writable = new SqoopWritable(); + SqoopWritable writable = new SqoopWritable(dataFormat); try { for (int i = 0; i < 10; i++) { StringBuilder builder = new StringBuilder(); @@ -209,7 +206,6 @@ public class TestSqoopOutputFormatLoadExecutor { } } dataFormat.setTextData(builder.toString()); - writable.setString(dataFormat.getTextData()); writer.write(writable, null); } writer.close(null);
