Repository: sqoop Updated Branches: refs/heads/sqoop2 c4246c53f -> 1f4b7fd29
SQOOP-1830: GenericJdBcExtractor does not create java date objects when extracting (Veena Basavaraj via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/1f4b7fd2 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/1f4b7fd2 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/1f4b7fd2 Branch: refs/heads/sqoop2 Commit: 1f4b7fd29126ffcec00cf7e0723b609f10ad3317 Parents: c4246c5 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Dec 3 10:34:48 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Dec 3 10:34:48 2014 -0800 ---------------------------------------------------------------------- .../apache/sqoop/job/etl/ExtractorContext.java | 16 +++- .../org/apache/sqoop/job/etl/LoaderContext.java | 6 +- .../jdbc/GenericJdbcConnectorError.java | 5 +- .../connector/jdbc/GenericJdbcExtractor.java | 48 ++++++++++-- .../sqoop/connector/jdbc/TestExtractor.java | 79 +++++++++++++++++--- .../sqoop/connector/jdbc/TestPartitioner.java | 2 +- .../sqoop/connector/hdfs/TestExtractor.java | 4 +- .../apache/sqoop/job/mr/SqoopInputFormat.java | 5 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 21 +++--- 9 files changed, 144 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java index 4875ed0..1e0f0ec 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java @@ -19,6 +19,7 @@ package org.apache.sqoop.job.etl; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.schema.Schema; /** * Context implementation for Extractor. @@ -27,12 +28,14 @@ import org.apache.sqoop.etl.io.DataWriter; */ public class ExtractorContext extends TransferableContext { - private DataWriter writer; + private final DataWriter writer; + private final Schema schema; - public ExtractorContext(ImmutableContext context, DataWriter writer) { + public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) { super(context); this.writer = writer; + this.schema = schema; } /** @@ -43,6 +46,13 @@ public class ExtractorContext extends TransferableContext { public DataWriter getDataWriter() { return writer; } - + /** + * Return schema associated with FROM. + * + * @return + */ + public Schema getSchema() { + return schema; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java index 563b9ad..9d556eb 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java @@ -28,9 +28,9 @@ import org.apache.sqoop.schema.Schema; */ public class LoaderContext extends TransferableContext { - private DataReader reader; + private final DataReader reader; - private Schema schema; + private final Schema schema; public LoaderContext(ImmutableContext context, DataReader reader, Schema schema) { super(context); @@ -48,7 +48,7 @@ public class LoaderContext extends TransferableContext { } /** - * Return schema associated with this step. + * Return schema associated with TO. * * @return */ http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java index c291cb2..0fa4a32 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java @@ -81,7 +81,10 @@ public enum GenericJdbcConnectorError implements ErrorCode { GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported."), - GENERIC_JDBC_CONNECTOR_0020("Unknown direction.") + GENERIC_JDBC_CONNECTOR_0020("Unknown direction."), + + GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set column size"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java index af9320b..9a61701 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java @@ -20,13 +20,20 @@ package org.apache.sqoop.connector.jdbc; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.List; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration; import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; -import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration; import org.apache.sqoop.job.etl.Extractor; +import org.apache.sqoop.job.etl.ExtractorContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.joda.time.LocalDate; +import org.joda.time.LocalDateTime; +import org.joda.time.LocalTime; + public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, GenericJdbcPartition> { @@ -50,14 +57,41 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo rowsRead = 0; ResultSet resultSet = executor.executeQuery(query); + Schema schema = context.getSchema(); + Column[] schemaColumns = schema.getColumns().toArray(new Column[schema.getColumns().size()]); try { ResultSetMetaData metaData = resultSet.getMetaData(); - int column = metaData.getColumnCount(); + int columnCount = metaData.getColumnCount(); + if (schemaColumns.length != columnCount) { + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0021, schemaColumns.length + ":" + columnCount); + } while (resultSet.next()) { - Object[] array = new Object[column]; - for (int i = 0; i< column; i++) { - array[i] = resultSet.getObject(i + 1) == null ? GenericJdbcConnectorConstants.SQL_NULL_VALUE - : resultSet.getObject(i + 1); + Object[] array = new Object[columnCount]; + for (int i = 0; i < columnCount; i++) { + // check type of the column + Column schemaColumn = schemaColumns[i]; + if(resultSet.getObject(i + 1) == null) { + array[i] = GenericJdbcConnectorConstants.SQL_NULL_VALUE ; + continue; + } + switch (schemaColumn.getType()) { + case DATE: + // convert the sql date to JODA time as prescribed the Sqoop IDF spec + array[i] = LocalDate.fromDateFields((java.sql.Date)resultSet.getObject(i + 1)); + break; + case DATE_TIME: + // convert the sql date time to JODA time as prescribed the Sqoop IDF spec + array[i] = LocalDateTime.fromDateFields((java.sql.Date)resultSet.getObject(i + 1)); + break; + case TIME: + // convert the sql time to JODA time as prescribed the Sqoop IDF spec + array[i] = LocalTime.fromDateFields((java.sql.Date)resultSet.getObject(i + 1)); + break; + default: + //for anything else + array[i] = resultSet.getObject(i + 1); + + } } context.getDataWriter().writeArrayRecord(array); rowsRead++; http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java index d1e6805..8e1ce5b 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java @@ -17,13 +17,21 @@ */ package org.apache.sqoop.connector.jdbc; + import org.apache.sqoop.common.MutableContext; import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration; +import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Date; +import org.apache.sqoop.schema.type.Decimal; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.joda.time.LocalDate; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -31,6 +39,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; + public class TestExtractor { private final String tableName; @@ -54,12 +63,12 @@ public class TestExtractor { if (!executor.existTable(tableName)) { executor.executeUpdate("CREATE TABLE " + executor.delimitIdentifier(tableName) - + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)"); for (int i = 0; i < NUMBER_OF_ROWS; i++) { int value = START + i; String sql = "INSERT INTO " + executor.delimitIdentifier(tableName) - + " VALUES(" + value + ", " + value + ", '" + value + "')"; + + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')"; executor.executeUpdate(sql); } } @@ -70,6 +79,7 @@ public class TestExtractor { executor.close(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testQuery() throws Exception { MutableContext context = new MutableMapContext(); @@ -88,7 +98,12 @@ public class TestExtractor { Extractor extractor = new GenericJdbcExtractor(); DummyWriter writer = new DummyWriter(); - ExtractorContext extractorContext = new ExtractorContext(context, writer); + Schema schema = new Schema("TestExtractor"); + // dummy columns added, all we need is the column count to match to the + // result set + schema.addColumn(new FixedPoint("c1")).addColumn(new Decimal("c2")).addColumn(new Text("c3")).addColumn(new Date("c4")); + + ExtractorContext extractorContext = new ExtractorContext(context, writer, schema); partition = new GenericJdbcPartition(); partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665"); @@ -103,6 +118,7 @@ public class TestExtractor { extractor.extract(extractorContext, linkConfig, jobConfig, partition); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testSubquery() throws Exception { MutableContext context = new MutableMapContext(); @@ -115,15 +131,19 @@ public class TestExtractor { FromJobConfiguration jobConfig = new FromJobConfiguration(); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, - "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " - + "(SELECT * FROM " + executor.delimitIdentifier(tableName) - + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM " + + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); GenericJdbcPartition partition; Extractor extractor = new GenericJdbcExtractor(); DummyWriter writer = new DummyWriter(); - ExtractorContext extractorContext = new ExtractorContext(context, writer); + Schema schema = new Schema("TestExtractor"); + // dummy columns added, all we need is the column count to match to the + // result set + schema.addColumn(new FixedPoint("c1")).addColumn(new Text("c2")).addColumn(new Date("c3")); + + ExtractorContext extractorContext = new ExtractorContext(context, writer, schema); partition = new GenericJdbcPartition(); partition.setConditions("-50 <= ICOL AND ICOL < -16"); @@ -138,21 +158,56 @@ public class TestExtractor { extractor.extract(extractorContext, linkConfig, jobConfig, partition); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test(expected = SqoopException.class) + public void testIncorrectSchemaColumnSize() throws Exception { + MutableContext context = new MutableMapContext(); + + LinkConfiguration linkConfig = new LinkConfiguration(); + + linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER; + linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL; + + FromJobConfiguration jobConfig = new FromJobConfiguration(); + + context.setString( + GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL, + "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM " + + executor.delimitIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS"); + + GenericJdbcPartition partition; + + Extractor extractor = new GenericJdbcExtractor(); + DummyWriter writer = new DummyWriter(); + Schema schema = new Schema("TestIncorrectColumns"); + ExtractorContext extractorContext = new ExtractorContext(context, writer, schema); + + partition = new GenericJdbcPartition(); + partition.setConditions("-50 <= ICOL AND ICOL < -16"); + extractor.extract(extractorContext, linkConfig, jobConfig, partition); + + } + public class DummyWriter extends DataWriter { int indx = START; @Override public void writeArrayRecord(Object[] array) { + boolean parsedDate = false; for (int i = 0; i < array.length; i++) { if (array[i] instanceof Integer) { - assertEquals(indx, ((Integer)array[i]).intValue()); + assertEquals(indx, ((Integer) array[i]).intValue()); } else if (array[i] instanceof Double) { assertEquals((double)indx, ((Double)array[i]).doubleValue(), EPSILON); - } else { + } else if (array[i] instanceof String) { assertEquals(String.valueOf(indx), array[i].toString()); + } else if (array[i] instanceof LocalDate) { + assertEquals("2004-10-19", array[i].toString()); + parsedDate = true; } } indx++; + assertEquals(true, parsedDate); } @Override @@ -165,4 +220,4 @@ public class TestExtractor { fail("This method should not be invoked."); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java index dee0242..d62e494 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java @@ -477,7 +477,7 @@ public class TestPartitioner { } @Test - public void testPatitionWithNullValues() throws Exception { + public void testPartitionWithNullValues() throws Exception { MutableContext context = new MutableMapContext(); context.setString(GenericJdbcConnectorConstants .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index 0a6369f..f4b4454 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -129,7 +129,7 @@ public class TestExtractor extends TestHdfsBase { public void writeRecord(Object obj) { throw new AssertionError("Should not be writing object."); } - }); + }, null); LinkConfiguration emptyLinkConfig = new LinkConfiguration(); FromJobConfiguration emptyJobConfig = new FromJobConfiguration(); @@ -141,4 +141,4 @@ public class TestExtractor extends TestHdfsBase { Assert.assertTrue("Index " + (index + 1) + " was not visited", visited[index]); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index d20c903..a562e02 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -54,6 +54,7 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { return new SqoopRecordReader(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { @@ -65,10 +66,10 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object connectorFromJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); - Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); - PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema); + PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, fromSchema); List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig); List<InputSplit> splits = new LinkedList<InputSplit>(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/1f4b7fd2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 664692a..b9dd11d 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -38,6 +38,7 @@ import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.io.SqoopWritable; +import org.apache.sqoop.schema.Schema; import org.apache.sqoop.submission.counter.SqoopCounters; import org.apache.sqoop.utils.ClassUtils; @@ -59,6 +60,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, private IntermediateDataFormat<String> toDataFormat = null; private Matcher matcher; + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); @@ -66,16 +68,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); - matcher = MatcherFactory.getMatcher( - MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf), - MRConfigurationUtils.getConnectorSchema(Direction.TO, conf)); + Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf); + matcher = MatcherFactory.getMatcher(fromSchema, toSchema); String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT); - fromDataFormat = (IntermediateDataFormat<String>) ClassUtils - .instantiate(intermediateDataFormatName); + fromDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName); fromDataFormat.setSchema(matcher.getFromSchema()); - toDataFormat = (IntermediateDataFormat<String>) ClassUtils - .instantiate(intermediateDataFormatName); + toDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName); toDataFormat.setSchema(matcher.getToSchema()); // Objects that should be passed to the Executor execution @@ -84,7 +84,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); - ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context)); + ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), fromSchema); try { LOG.info("Starting progress service"); @@ -93,14 +93,13 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, LOG.info("Running extractor class " + extractorName); extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition()); LOG.info("Extractor has finished"); - context.getCounter(SqoopCounters.ROWS_READ) - .increment(extractor.getRowsRead()); + context.getCounter(SqoopCounters.ROWS_READ).increment(extractor.getRowsRead()); } catch (Exception e) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e); } finally { LOG.info("Stopping progress service"); progressService.shutdown(); - if(!progressService.awaitTermination(5, TimeUnit.SECONDS)) { + if (!progressService.awaitTermination(5, TimeUnit.SECONDS)) { LOG.info("Stopping progress service with shutdownNow"); progressService.shutdownNow(); }
