Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 a2e87bef0 -> 90ec25b2a


SQOOP-1349: Sqoop2: Use configurable writable to get Intermediate Data Format

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/90ec25b2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/90ec25b2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/90ec25b2

Branch: refs/heads/sqoop2
Commit: 90ec25b2a8ac44b1f42ac7d9eb0a1557717a3629
Parents: a2e87be
Author: Abraham Elmahrek <[email protected]>
Authored: Thu Nov 13 10:08:19 2014 -0800
Committer: Abraham Elmahrek <[email protected]>
Committed: Thu Nov 13 10:08:19 2014 -0800

----------------------------------------------------------------------
 .../org/apache/sqoop/job/io/FieldTypes.java     | 42 ----------------
 .../org/apache/sqoop/job/io/SqoopWritable.java  | 50 ++++++++++++++------
 .../apache/sqoop/job/mr/SqoopInputFormat.java   | 10 ++--
 .../org/apache/sqoop/job/mr/SqoopMapper.java    | 14 ++++--
 .../job/mr/SqoopOutputFormatLoadExecutor.java   | 16 ++++---
 .../apache/sqoop/job/io/TestSqoopWritable.java  |  5 +-
 .../mr/TestSqoopOutputFormatLoadExecutor.java   | 12 ++---
 7 files changed, 66 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
deleted file mode 100644
index e96dc6e..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.io;
-
-public final class FieldTypes {
-
-  public static final int NULL    = 0;
-
-  public static final int BOOLEAN = 1;
-
-  public static final int BYTE    = 10;
-  public static final int CHAR    = 11;
-
-  public static final int SHORT   = 20;
-  public static final int INT     = 21;
-  public static final int LONG    = 22;
-
-  public static final int FLOAT   = 50;
-  public static final int DOUBLE  = 51;
-
-  public static final int BIN     = 100;
-  public static final int UTF     = 101;
-
-  private FieldTypes() {
-    // Disable explicit object creation
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
index ed118d2..05b731a 100644
--- 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
+++ 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
@@ -18,42 +18,64 @@
  */
 package org.apache.sqoop.job.io;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
 
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.utils.ClassUtils;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-public class SqoopWritable implements WritableComparable<SqoopWritable> {
-  private String strData;
+public class SqoopWritable implements Configurable, 
WritableComparable<SqoopWritable> {
+  private IntermediateDataFormat<?> dataFormat;
+  private Configuration conf;
 
-  public SqoopWritable() {}
+  public SqoopWritable() {
+    this(null);
+  }
 
-  public void setString(String data) {
-    strData = data;
+  public SqoopWritable(IntermediateDataFormat<?> dataFormat) {
+    this.dataFormat = dataFormat;
   }
 
-  public String getString() {
-    return strData;
+  public void setString(String data) {
+    this.dataFormat.setTextData(data);
   }
 
+  public String getString() { return dataFormat.getTextData(); }
+
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeUTF(strData);
+    out.writeUTF(dataFormat.getTextData());
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
-    strData = in.readUTF();
-  }
+  public void readFields(DataInput in) throws IOException { 
dataFormat.setTextData(in.readUTF()); }
 
   @Override
-  public int compareTo(SqoopWritable o) {
-    return strData.compareTo(o.getString());
-  }
+  public int compareTo(SqoopWritable o) { return 
getString().compareTo(o.getString()); }
 
   @Override
   public String toString() {
     return getString();
   }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+
+    if (dataFormat == null) {
+      String intermediateDataFormatName = 
conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
+      this.dataFormat = (IntermediateDataFormat<?>) 
ClassUtils.instantiate(intermediateDataFormatName);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 887b4bb..d20c903 100644
--- 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -30,10 +30,10 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.MRExecutionError;
-import org.apache.sqoop.common.PrefixContext;
+import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
@@ -63,14 +63,14 @@ public class SqoopInputFormat extends 
InputFormat<SqoopSplit, NullWritable> {
     Partitioner partitioner = (Partitioner) 
ClassUtils.instantiate(partitionerName);
 
     PrefixContext connectorContext = new PrefixContext(conf, 
MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
-    Object connectorConnection = 
MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
-    Object connectorJob = 
MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
+    Object connectorLinkConfig = 
MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
+    Object connectorFromJobConfig = 
MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
     Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, 
conf);
 
     long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 
10);
     PartitionerContext partitionerContext = new 
PartitionerContext(connectorContext, maxPartitions, schema);
 
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, 
connectorConnection, connectorJob);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, 
connectorLinkConfig, connectorFromJobConfig);
     List<InputSplit> splits = new LinkedList<InputSplit>();
     for (Partition partition : partitions) {
       LOG.debug("Partition: " + partition);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index e25f404..664692a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -107,13 +107,17 @@ public class SqoopMapper extends Mapper<SqoopSplit, 
NullWritable, SqoopWritable,
     }
   }
 
+  // There are two IDF objects we carry around in memory during the sqoop job 
execution.
+  // The fromDataFormat has the fromSchema in it, the toDataFormat has the 
toSchema in it.
+  // Before we do the writing to the toDatFormat object we do the matching 
process to negotiate between 
+  // the two schemas and their corresponding column types before we write the 
data to the toDataFormat object
   private class SqoopMapDataWriter extends DataWriter {
     private Context context;
     private SqoopWritable writable;
 
     public SqoopMapDataWriter(Context context) {
       this.context = context;
-      this.writable = new SqoopWritable();
+      this.writable = new SqoopWritable(toDataFormat);
     }
 
     @Override
@@ -139,10 +143,10 @@ public class SqoopMapper extends Mapper<SqoopSplit, 
NullWritable, SqoopWritable,
         if (LOG.isDebugEnabled()) {
           LOG.debug("Extracted data: " + fromDataFormat.getTextData());
         }
-
-        toDataFormat.setObjectData( matcher.getMatchingData( 
fromDataFormat.getObjectData() ) );
-
-        writable.setString(toDataFormat.getTextData());
+        // NOTE: The fromDataFormat and the corresponding fromSchema is used 
only for the matching process
+        // The output of the mappers is finally written to the toDataFormat 
object after the matching process
+        // since the writable encapsulates the toDataFormat ==> new 
SqoopWritable(toDataFormat)
+        
toDataFormat.setObjectData(matcher.getMatchingData(fromDataFormat.getObjectData()));
         context.write(writable, NullWritable.get());
       } catch (Exception e) {
         throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 579101e..49a66b9 100644
--- 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -63,6 +63,7 @@ public class SqoopOutputFormatLoadExecutor {
   private volatile boolean isTest = false;
   private String loaderName;
 
+  // NOTE: This method is only exposed for test cases and hence assumes 
CSVIntermediateDataFormat
   SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
     this.isTest = isTest;
     this.loaderName = loaderName;
@@ -79,6 +80,7 @@ public class SqoopOutputFormatLoadExecutor {
         MRConfigurationUtils.getConnectorSchema(Direction.TO, 
context.getConfiguration()));
     dataFormat = (IntermediateDataFormat<String>) 
ClassUtils.instantiate(context
         .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT));
+    // Using the TO schema since the SqoopDataWriter in the SqoopMapper 
encapsulates the toDataFormat
     dataFormat.setSchema(matcher.getToSchema());
   }
 
@@ -99,6 +101,7 @@ public class SqoopOutputFormatLoadExecutor {
     public void write(SqoopWritable key, NullWritable value) throws 
InterruptedException {
       free.acquire();
       checkIfConsumerThrew();
+      // NOTE: this is the place where data written from SqoopMapper writable 
is available to the SqoopOutputFormat
       dataFormat.setTextData(key.getString());
       filled.release();
     }
@@ -227,24 +230,23 @@ public class SqoopOutputFormatLoadExecutor {
 
         // Objects that should be pass to the Executor execution
         PrefixContext subContext = null;
-        Object configConnection = null;
-        Object configJob = null;
+        Object connectorLinkConfig = null;
+        Object connectorToJobConfig = null;
         Schema schema = null;
 
         if (!isTest) {
-          // Using the TO schema since the IDF returns data in TO schema
+          // Using the TO schema since the SqoopDataWriter in the SqoopMapper 
encapsulates the toDataFormat
           schema = matcher.getToSchema();
-
           subContext = new PrefixContext(conf, 
MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
-          configConnection = 
MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf);
-          configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, 
conf);
+          connectorLinkConfig = 
MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf);
+          connectorToJobConfig = 
MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
         }
 
         // Create loader context
         LoaderContext loaderContext = new LoaderContext(subContext, reader, 
schema);
 
         LOG.info("Running loader class " + loaderName);
-        loader.load(loaderContext, configConnection, configJob);
+        loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
         LOG.info("Loader has finished");
       } catch (Throwable t) {
         readerFinished = true;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
index 3207e53..b07a076 100644
--- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
@@ -27,12 +27,13 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestSqoopWritable {
 
-  private final SqoopWritable writable = new SqoopWritable();
+  private final SqoopWritable writable = new SqoopWritable(new 
CSVIntermediateDataFormat());
 
   @Test
   public void testStringInStringOut() {
@@ -78,7 +79,7 @@ public class TestSqoopWritable {
 
     //Don't test what the data is, test that SqoopWritable can read it.
     InputStream instream = new ByteArrayInputStream(written);
-    SqoopWritable newWritable = new SqoopWritable();
+    SqoopWritable newWritable = new SqoopWritable(new 
CSVIntermediateDataFormat());
     DataInput in = new DataInputStream(instream);
     newWritable.readFields(in);
     Assert.assertEquals(testData, newWritable.getString());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/90ec25b2/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 67e965d..7c40ad5 100644
--- 
a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ 
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -132,11 +132,10 @@ public class TestSqoopOutputFormatLoadExecutor {
         SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-    SqoopWritable writable = new SqoopWritable();
+    SqoopWritable writable = new SqoopWritable(dataFormat);
     try {
       for (int count = 0; count < 100; count++) {
         dataFormat.setTextData(String.valueOf(count));
-        writable.setString(dataFormat.getTextData());
         writer.write(writable, null);
       }
     } catch (SqoopException ex) {
@@ -151,7 +150,7 @@ public class TestSqoopOutputFormatLoadExecutor {
         SqoopOutputFormatLoadExecutor(true, 
GoodContinuousLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-    SqoopWritable writable = new SqoopWritable();
+    SqoopWritable writable = new SqoopWritable(dataFormat);
     for (int i = 0; i < 10; i++) {
       StringBuilder builder = new StringBuilder();
       for (int count = 0; count < 100; count++) {
@@ -161,7 +160,6 @@ public class TestSqoopOutputFormatLoadExecutor {
         }
       }
       dataFormat.setTextData(builder.toString());
-      writable.setString(dataFormat.getTextData());
       writer.write(writable, null);
     }
     writer.close(null);
@@ -173,7 +171,7 @@ public class TestSqoopOutputFormatLoadExecutor {
         SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-    SqoopWritable writable = new SqoopWritable();
+    SqoopWritable writable = new SqoopWritable(dataFormat);
     StringBuilder builder = new StringBuilder();
     for (int count = 0; count < 100; count++) {
       builder.append(String.valueOf(count));
@@ -182,7 +180,6 @@ public class TestSqoopOutputFormatLoadExecutor {
       }
     }
     dataFormat.setTextData(builder.toString());
-    writable.setString(dataFormat.getTextData());
     writer.write(writable, null);
 
     //Allow writer to complete.
@@ -198,7 +195,7 @@ public class TestSqoopOutputFormatLoadExecutor {
         SqoopOutputFormatLoadExecutor(true, 
ThrowingContinuousLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = 
executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-    SqoopWritable writable = new SqoopWritable();
+    SqoopWritable writable = new SqoopWritable(dataFormat);
     try {
       for (int i = 0; i < 10; i++) {
         StringBuilder builder = new StringBuilder();
@@ -209,7 +206,6 @@ public class TestSqoopOutputFormatLoadExecutor {
           }
         }
         dataFormat.setTextData(builder.toString());
-        writable.setString(dataFormat.getTextData());
         writer.write(writable, null);
       }
       writer.close(null);

Reply via email to