Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 c4246c53f -> 1f4b7fd29


SQOOP-1830: GenericJdBcExtractor does not create java date objects when 
extracting

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/1f4b7fd2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/1f4b7fd2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/1f4b7fd2

Branch: refs/heads/sqoop2
Commit: 1f4b7fd29126ffcec00cf7e0723b609f10ad3317
Parents: c4246c5
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Wed Dec 3 10:34:48 2014 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Wed Dec 3 10:34:48 2014 -0800

----------------------------------------------------------------------
 .../apache/sqoop/job/etl/ExtractorContext.java  | 16 +++-
 .../org/apache/sqoop/job/etl/LoaderContext.java |  6 +-
 .../jdbc/GenericJdbcConnectorError.java         |  5 +-
 .../connector/jdbc/GenericJdbcExtractor.java    | 48 ++++++++++--
 .../sqoop/connector/jdbc/TestExtractor.java     | 79 +++++++++++++++++---
 .../sqoop/connector/jdbc/TestPartitioner.java   |  2 +-
 .../sqoop/connector/hdfs/TestExtractor.java     |  4 +-
 .../apache/sqoop/job/mr/SqoopInputFormat.java   |  5 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    | 21 +++---
 9 files changed, 144 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java 
b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
index 4875ed0..1e0f0ec 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.job.etl;
 
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.schema.Schema;
 
 /**
  * Context implementation for Extractor.
@@ -27,12 +28,14 @@ import org.apache.sqoop.etl.io.DataWriter;
  */
 public class ExtractorContext extends TransferableContext {
 
-  private DataWriter writer;
+  private final DataWriter writer;
 
+  private final Schema schema;
 
-  public ExtractorContext(ImmutableContext context, DataWriter writer) {
+  public ExtractorContext(ImmutableContext context, DataWriter writer, Schema 
schema) {
     super(context);
     this.writer = writer;
+    this.schema = schema;
   }
 
   /**
@@ -43,6 +46,13 @@ public class ExtractorContext extends TransferableContext {
   public DataWriter getDataWriter() {
     return writer;
   }
-
+  /**
+   * Return schema associated with FROM.
+   *
+   * @return
+   */
+  public Schema getSchema() {
+    return schema;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java 
b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
index 563b9ad..9d556eb 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
@@ -28,9 +28,9 @@ import org.apache.sqoop.schema.Schema;
  */
 public class LoaderContext extends TransferableContext {
 
-  private DataReader reader;
+  private final DataReader reader;
 
-  private Schema schema;
+  private final Schema schema;
 
   public LoaderContext(ImmutableContext context, DataReader reader, Schema 
schema) {
     super(context);
@@ -48,7 +48,7 @@ public class LoaderContext extends TransferableContext {
   }
 
   /**
-   * Return schema associated with this step.
+   * Return schema associated with TO.
    *
    * @return
    */

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
index c291cb2..0fa4a32 100644
--- 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
+++ 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
@@ -81,7 +81,10 @@ public enum GenericJdbcConnectorError implements ErrorCode {
 
   GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported."),
 
-  GENERIC_JDBC_CONNECTOR_0020("Unknown direction.")
+  GENERIC_JDBC_CONNECTOR_0020("Unknown direction."),
+
+  GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set 
column size"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
index af9320b..9a61701 100644
--- 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
+++ 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
@@ -20,13 +20,20 @@ package org.apache.sqoop.connector.jdbc;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
-import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
 import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalDateTime;
+import org.joda.time.LocalTime;
+
 
 public class GenericJdbcExtractor extends Extractor<LinkConfiguration, 
FromJobConfiguration, GenericJdbcPartition> {
 
@@ -50,14 +57,41 @@ public class GenericJdbcExtractor extends 
Extractor<LinkConfiguration, FromJobCo
     rowsRead = 0;
     ResultSet resultSet = executor.executeQuery(query);
 
+    Schema schema = context.getSchema();
+    Column[] schemaColumns = schema.getColumns().toArray(new 
Column[schema.getColumns().size()]);
     try {
       ResultSetMetaData metaData = resultSet.getMetaData();
-      int column = metaData.getColumnCount();
+      int columnCount = metaData.getColumnCount();
+      if (schemaColumns.length != columnCount) {
+        throw new 
SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0021, 
schemaColumns.length + ":" + columnCount);
+      }
       while (resultSet.next()) {
-        Object[] array = new Object[column];
-        for (int i = 0; i< column; i++) {
-          array[i] = resultSet.getObject(i + 1) == null ? 
GenericJdbcConnectorConstants.SQL_NULL_VALUE
-              : resultSet.getObject(i + 1);
+        Object[] array = new Object[columnCount];
+        for (int i = 0; i < columnCount; i++) {
+          // check type of the column
+          Column schemaColumn = schemaColumns[i];
+          if(resultSet.getObject(i + 1) == null) {
+            array[i] = GenericJdbcConnectorConstants.SQL_NULL_VALUE ;
+            continue;
+          }
+          switch (schemaColumn.getType()) {
+          case DATE:
+            // convert the sql date to JODA time as prescribed the Sqoop IDF 
spec
+            array[i] = 
LocalDate.fromDateFields((java.sql.Date)resultSet.getObject(i + 1));
+            break;
+          case DATE_TIME:
+            // convert the sql date time to JODA time as prescribed the Sqoop 
IDF spec
+            array[i] = 
LocalDateTime.fromDateFields((java.sql.Date)resultSet.getObject(i + 1));
+            break;
+          case TIME:
+            // convert the sql time to JODA time as prescribed the Sqoop IDF 
spec
+            array[i] = 
LocalTime.fromDateFields((java.sql.Date)resultSet.getObject(i + 1));
+            break;
+          default:
+            //for anything else
+            array[i] = resultSet.getObject(i + 1);
+
+          }
         }
         context.getDataWriter().writeArrayRecord(array);
         rowsRead++;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
 
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
index d1e6805..8e1ce5b 100644
--- 
a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
+++ 
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
@@ -17,13 +17,21 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
+import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
-import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Date;
+import org.apache.sqoop.schema.type.Decimal;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDate;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,6 +39,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+
 public class TestExtractor {
 
   private final String tableName;
@@ -54,12 +63,12 @@ public class TestExtractor {
     if (!executor.existTable(tableName)) {
       executor.executeUpdate("CREATE TABLE "
           + executor.delimitIdentifier(tableName)
-          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL 
DATE)");
 
       for (int i = 0; i < NUMBER_OF_ROWS; i++) {
         int value = START + i;
         String sql = "INSERT INTO " + executor.delimitIdentifier(tableName)
-            + " VALUES(" + value + ", " + value + ", '" + value + "')";
+            + " VALUES(" + value + ", " + value + ", '" + value + "', 
'2004-10-19')";
         executor.executeUpdate(sql);
       }
     }
@@ -70,6 +79,7 @@ public class TestExtractor {
     executor.close();
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
   public void testQuery() throws Exception {
     MutableContext context = new MutableMapContext();
@@ -88,7 +98,12 @@ public class TestExtractor {
 
     Extractor extractor = new GenericJdbcExtractor();
     DummyWriter writer = new DummyWriter();
-    ExtractorContext extractorContext = new ExtractorContext(context, writer);
+    Schema schema = new Schema("TestExtractor");
+    // dummy columns added, all we need is the column count to match to the
+    // result set
+    schema.addColumn(new FixedPoint("c1")).addColumn(new 
Decimal("c2")).addColumn(new Text("c3")).addColumn(new Date("c4"));
+
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, 
schema);
 
     partition = new GenericJdbcPartition();
     partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
@@ -103,6 +118,7 @@ public class TestExtractor {
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test
   public void testSubquery() throws Exception {
     MutableContext context = new MutableMapContext();
@@ -115,15 +131,19 @@ public class TestExtractor {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
-        "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
-            + "(SELECT * FROM " + executor.delimitIdentifier(tableName)
-            + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
+        "SELECT 
SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL
 FROM " + "(SELECT * FROM "
+            + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}) 
SQOOP_SUBQUERY_ALIAS");
 
     GenericJdbcPartition partition;
 
     Extractor extractor = new GenericJdbcExtractor();
     DummyWriter writer = new DummyWriter();
-    ExtractorContext extractorContext = new ExtractorContext(context, writer);
+    Schema schema = new Schema("TestExtractor");
+    // dummy columns added, all we need is the column count to match to the
+    // result set
+    schema.addColumn(new FixedPoint("c1")).addColumn(new 
Text("c2")).addColumn(new Date("c3"));
+
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, 
schema);
 
     partition = new GenericJdbcPartition();
     partition.setConditions("-50 <= ICOL AND ICOL < -16");
@@ -138,21 +158,56 @@ public class TestExtractor {
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test(expected = SqoopException.class)
+  public void testIncorrectSchemaColumnSize() throws Exception {
+    MutableContext context = new MutableMapContext();
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+    context.setString(
+        GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
+        "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + 
"(SELECT * FROM "
+            + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}) 
SQOOP_SUBQUERY_ALIAS");
+
+    GenericJdbcPartition partition;
+
+    Extractor extractor = new GenericJdbcExtractor();
+    DummyWriter writer = new DummyWriter();
+    Schema schema = new Schema("TestIncorrectColumns");
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, 
schema);
+
+    partition = new GenericJdbcPartition();
+    partition.setConditions("-50 <= ICOL AND ICOL < -16");
+    extractor.extract(extractorContext, linkConfig, jobConfig, partition);
+
+  }
+
   public class DummyWriter extends DataWriter {
     int indx = START;
 
     @Override
     public void writeArrayRecord(Object[] array) {
+      boolean parsedDate = false;
       for (int i = 0; i < array.length; i++) {
         if (array[i] instanceof Integer) {
-          assertEquals(indx, ((Integer)array[i]).intValue());
+          assertEquals(indx, ((Integer) array[i]).intValue());
         } else if (array[i] instanceof Double) {
           assertEquals((double)indx, ((Double)array[i]).doubleValue(), 
EPSILON);
-        } else {
+        } else if (array[i] instanceof String) {
           assertEquals(String.valueOf(indx), array[i].toString());
+        } else if (array[i] instanceof LocalDate) {
+          assertEquals("2004-10-19", array[i].toString());
+          parsedDate = true;
         }
       }
       indx++;
+      assertEquals(true, parsedDate);
     }
 
     @Override
@@ -165,4 +220,4 @@ public class TestExtractor {
       fail("This method should not be invoked.");
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
 
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
index dee0242..d62e494 100644
--- 
a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
+++ 
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
@@ -477,7 +477,7 @@ public class TestPartitioner {
   }
 
   @Test
-  public void testPatitionWithNullValues() throws Exception {
+  public void testPartitionWithNullValues() throws Exception {
     MutableContext context = new MutableMapContext();
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index 0a6369f..f4b4454 100644
--- 
a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ 
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -129,7 +129,7 @@ public class TestExtractor extends TestHdfsBase {
       public void writeRecord(Object obj) {
         throw new AssertionError("Should not be writing object.");
       }
-    });
+    }, null);
 
     LinkConfiguration emptyLinkConfig = new LinkConfiguration();
     FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
@@ -141,4 +141,4 @@ public class TestExtractor extends TestHdfsBase {
       Assert.assertTrue("Index " + (index + 1) + " was not visited", 
visited[index]);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index d20c903..a562e02 100644
--- 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -54,6 +54,7 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, 
NullWritable> {
     return new SqoopRecordReader();
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Override
   public List<InputSplit> getSplits(JobContext context)
       throws IOException, InterruptedException {
@@ -65,10 +66,10 @@ public class SqoopInputFormat extends 
InputFormat<SqoopSplit, NullWritable> {
     PrefixContext connectorContext = new PrefixContext(conf, 
MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
     Object connectorLinkConfig = 
MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
     Object connectorFromJobConfig = 
MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
-    Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, 
conf);
+    Schema fromSchema = 
MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
 
     long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 
10);
-    PartitionerContext partitionerContext = new 
PartitionerContext(connectorContext, maxPartitions, schema);
+    PartitionerContext partitionerContext = new 
PartitionerContext(connectorContext, maxPartitions, fromSchema);
 
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, 
connectorLinkConfig, connectorFromJobConfig);
     List<InputSplit> splits = new LinkedList<InputSplit>();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git 
a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java 
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 664692a..b9dd11d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -38,6 +38,7 @@ import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.io.SqoopWritable;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -59,6 +60,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, 
NullWritable, SqoopWritable,
   private IntermediateDataFormat<String> toDataFormat = null;
   private Matcher matcher;
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
   public void run(Context context) throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
@@ -66,16 +68,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, 
NullWritable, SqoopWritable,
     String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
-    matcher = MatcherFactory.getMatcher(
-        MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
-        MRConfigurationUtils.getConnectorSchema(Direction.TO, conf));
+    Schema fromSchema = 
MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
+    Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, 
conf);
+    matcher = MatcherFactory.getMatcher(fromSchema, toSchema);
 
     String intermediateDataFormatName = 
conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
-    fromDataFormat = (IntermediateDataFormat<String>) ClassUtils
-        .instantiate(intermediateDataFormatName);
+    fromDataFormat = (IntermediateDataFormat<String>) 
ClassUtils.instantiate(intermediateDataFormatName);
     fromDataFormat.setSchema(matcher.getFromSchema());
-    toDataFormat = (IntermediateDataFormat<String>) ClassUtils
-        .instantiate(intermediateDataFormatName);
+    toDataFormat = (IntermediateDataFormat<String>) 
ClassUtils.instantiate(intermediateDataFormatName);
     toDataFormat.setSchema(matcher.getToSchema());
 
     // Objects that should be passed to the Executor execution
@@ -84,7 +84,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, 
NullWritable, SqoopWritable,
     Object fromJob = 
MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
 
     SqoopSplit split = context.getCurrentKey();
-    ExtractorContext extractorContext = new ExtractorContext(subContext, new 
SqoopMapDataWriter(context));
+    ExtractorContext extractorContext = new ExtractorContext(subContext, new 
SqoopMapDataWriter(context), fromSchema);
 
     try {
       LOG.info("Starting progress service");
@@ -93,14 +93,13 @@ public class SqoopMapper extends Mapper<SqoopSplit, 
NullWritable, SqoopWritable,
       LOG.info("Running extractor class " + extractorName);
       extractor.extract(extractorContext, fromConfig, fromJob, 
split.getPartition());
       LOG.info("Extractor has finished");
-      context.getCounter(SqoopCounters.ROWS_READ)
-              .increment(extractor.getRowsRead());
+      
context.getCounter(SqoopCounters.ROWS_READ).increment(extractor.getRowsRead());
     } catch (Exception e) {
       throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e);
     } finally {
       LOG.info("Stopping progress service");
       progressService.shutdown();
-      if(!progressService.awaitTermination(5, TimeUnit.SECONDS)) {
+      if (!progressService.awaitTermination(5, TimeUnit.SECONDS)) {
         LOG.info("Stopping progress service with shutdownNow");
         progressService.shutdownNow();
       }

Reply via email to