Repository: sqoop Updated Branches: refs/heads/sqoop2 fc32358ab -> 27d87b4f2
SQOOP-1862: Sqoop2: JDBC Connector To side needs to handle converting JODA objects to sql date (Veena Basavaraj via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/27d87b4f Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/27d87b4f Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/27d87b4f Branch: refs/heads/sqoop2 Commit: 27d87b4f2f7fcbcdb52503016446b4de1dd5709a Parents: fc32358 Author: Abraham Elmahrek <[email protected]> Authored: Tue Feb 3 13:58:36 2015 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Tue Feb 3 13:58:36 2015 -0800 ---------------------------------------------------------------------- .../common/test/asserts/ProviderAsserts.java | 2 +- .../connector/jdbc/GenericJdbcExecutor.java | 44 ++++++++++++++++---- .../sqoop/connector/jdbc/GenericJdbcLoader.java | 3 +- .../apache/sqoop/connector/jdbc/TestLoader.java | 36 ++++++++++++++-- .../idf/TestCSVIntermediateDataFormat.java | 2 +- .../java/org/apache/sqoop/test/data/Cities.java | 9 ++-- .../jdbc/generic/FromHDFSToRDBMSTest.java | 16 +++---- .../jdbc/generic/FromRDBMSToHDFSTest.java | 8 ++-- .../jdbc/generic/TableStagedRDBMSTest.java | 18 ++++---- .../connector/kafka/FromRDBMSToKafkaTest.java | 8 ++-- 10 files changed, 102 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java index fb4e7af..d8c3c8e 100644 --- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java +++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java @@ -53,7 +53,7 @@ public class ProviderAsserts { int i = 1; for(Object expectedValue : values) { Object actualValue = rs.getObject(i); - assertEquals("Columns do not match on position: " + i, expectedValue, actualValue); + assertEquals("Columns do not match on position: " + i, expectedValue.toString(), actualValue.toString()); i++; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index 5e7e4e6..7a01992 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -17,6 +17,15 @@ */ package org.apache.sqoop.connector.jdbc; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.error.code.GenericJdbcConnectorError; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Column; +import org.joda.time.DateTime; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; + import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; @@ -25,10 +34,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; - -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.error.code.GenericJdbcConnectorError; +import java.sql.Timestamp; public class GenericJdbcExecutor { @@ -167,10 +173,34 @@ public class GenericJdbcExecutor { } } - public void addBatch(Object[] array) { + public void addBatch(Object[] array, Schema schema) { try { - for (int i=0; i<array.length; i++) { - preparedStatement.setObject(i+1, array[i]); + Column[] schemaColumns = schema.getColumnsArray(); + for (int i = 0; i < array.length; i++) { + Column schemaColumn = schemaColumns[i]; + switch (schemaColumn.getType()) { + case DATE: + // convert the JODA date to sql date + LocalDate date = (LocalDate) array[i]; + java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis()); + preparedStatement.setObject(i + 1, sqlDate); + break; + case DATE_TIME: + // convert the JODA date time to sql date + DateTime dateTime = (DateTime) array[i]; + Timestamp timestamp = new Timestamp(dateTime.getMillis()); + preparedStatement.setObject(i + 1, timestamp); + break; + case TIME: + // convert the JODA time to sql date + LocalTime time = (LocalTime) array[i]; + java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis()); + preparedStatement.setObject(i + 1, sqlTime); + break; + default: + // for anything else + preparedStatement.setObject(i + 1, array[i]); + } } preparedStatement.addBatch(); } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java index 31fd876..ab1ac86 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java @@ -38,7 +38,6 @@ public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfigurat String password = linkConfig.linkConfig.password; GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); executor.setAutoCommit(false); - String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL); executor.beginBatch(sql); try { @@ -48,7 +47,7 @@ public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfigurat while ((array = context.getDataReader().readArrayRecord()) != null) { numberOfRowsPerBatch++; - executor.addBatch(array); + executor.addBatch(array, context.getSchema()); if (numberOfRowsPerBatch == rowsPerBatch) { numberOfBatchesPerTransaction++; http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java index ba66510..2479f89 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java @@ -26,6 +26,17 @@ import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Date; +import org.apache.sqoop.schema.type.DateTime; + +import org.apache.sqoop.schema.type.Time; + +import org.apache.sqoop.schema.type.Decimal; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -64,7 +75,7 @@ public class TestLoader { if (!executor.existTable(tableName)) { executor.executeUpdate("CREATE TABLE " + executor.delimitIdentifier(tableName) - + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))"); + + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE, DATETIMECOL TIMESTAMP, TIMECOL TIME)"); } else { executor.deleteTableData(tableName); } @@ -75,6 +86,7 @@ public class TestLoader { executor.close(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testInsert() throws Exception { MutableContext context = new MutableMapContext(); @@ -87,11 +99,16 @@ public class TestLoader { ToJobConfiguration jobConfig = new ToJobConfiguration(); context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL, - "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)"); + "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?,?,?,?)"); + Loader loader = new GenericJdbcLoader(); DummyReader reader = new DummyReader(); - LoaderContext loaderContext = new LoaderContext(context, reader, null); + Schema schema = new Schema("TestLoader"); + schema.addColumn(new FixedPoint("c1", 2L, true)).addColumn(new Decimal("c2", 5, 2)) + .addColumn(new Text("c3")).addColumn(new Date("c4")) + .addColumn(new DateTime("c5", false, false)).addColumn(new Time("c6", false)); + LoaderContext loaderContext = new LoaderContext(context, reader, schema); loader.load(loaderContext, linkConfig, jobConfig); int index = START; @@ -101,6 +118,10 @@ public class TestLoader { assertEquals(index, rs.getObject(1)); assertEquals((double) index, rs.getObject(2)); assertEquals(String.valueOf(index), rs.getObject(3)); + assertEquals("2004-10-19", rs.getObject(4).toString()); + assertEquals("2004-10-19 10:23:34.0", rs.getObject(5).toString()); + assertEquals("11:33:59", rs.getObject(6).toString()); + index++; } assertEquals(numberOfRows, index-START); @@ -111,11 +132,18 @@ public class TestLoader { @Override public Object[] readArrayRecord() { + LocalDate jodaDate= new LocalDate(2004, 10, 19); + org.joda.time.DateTime jodaDateTime= new org.joda.time.DateTime(2004, 10, 19, 10, 23, 34); + LocalTime time= new LocalTime(11, 33, 59); + if (index < numberOfRows) { Object[] array = new Object[] { START + index, (double) (START + index), - String.valueOf(START+index) }; + String.valueOf(START+index), + jodaDate, + jodaDateTime, + time}; index++; return array; } else { http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java index 2630a9d..9229639 100644 --- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java @@ -458,7 +458,7 @@ public class TestCSVIntermediateDataFormat { dataFormat = new CSVIntermediateDataFormat(schema); dataFormat.setCSVTextData("'2014-10-01'"); org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01); - assertEquals(date.toString(), dataFormat.getObjectData()[0].toString()); + assertEquals(date, dataFormat.getObjectData()[0]); } @Test http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/main/java/org/apache/sqoop/test/data/Cities.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/data/Cities.java b/test/src/main/java/org/apache/sqoop/test/data/Cities.java index 1e90c59..2589061 100644 --- a/test/src/main/java/org/apache/sqoop/test/data/Cities.java +++ b/test/src/main/java/org/apache/sqoop/test/data/Cities.java @@ -35,6 +35,7 @@ public class Cities extends DataSet { "id", "id", "int", "country", "varchar(50)", + "some_date", "date", "city", "varchar(50)" ); @@ -43,10 +44,10 @@ public class Cities extends DataSet { @Override public DataSet loadBasicData() { - provider.insertRow(tableBaseName, 1, "USA", "San Francisco"); - provider.insertRow(tableBaseName, 2, "USA", "Sunnyvale"); - provider.insertRow(tableBaseName, 3, "Czech Republic", "Brno"); - provider.insertRow(tableBaseName, 4, "USA", "Palo Alto"); + provider.insertRow(tableBaseName, 1, "USA", "2004-10-23","San Francisco"); + provider.insertRow(tableBaseName, 2, "USA", "2004-10-24", "Sunnyvale"); + provider.insertRow(tableBaseName, 3, "Czech Republic", "2004-10-25", "Brno"); + provider.insertRow(tableBaseName, 4, "USA", "2004-10-26", "Palo Alto"); return this; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java index f82abc7..0b530b9 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java @@ -36,10 +36,10 @@ public class FromHDFSToRDBMSTest extends ConnectorTestCase { public void testBasic() throws Exception { createTableCities(); createFromFile("input-0001", - "1,'USA','San Francisco'", - "2,'USA','Sunnyvale'", - "3,'Czech Republic','Brno'", - "4,'USA','Palo Alto'" + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" ); // RDBMS link @@ -69,10 +69,10 @@ public class FromHDFSToRDBMSTest extends ConnectorTestCase { executeJob(job); assertEquals(4L, rowCount()); - assertRowInCities(1, "USA", "San Francisco"); - assertRowInCities(2, "USA", "Sunnyvale"); - assertRowInCities(3, "Czech Republic", "Brno"); - assertRowInCities(4, "USA", "Palo Alto"); + assertRowInCities(1, "USA", "2004-10-23", "San Francisco"); + assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno"); + assertRowInCities(4, "USA", "2004-10-26", "Palo Alto"); // Clean up testing table dropTable(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java index aa9f212..ced52cc 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java @@ -67,10 +67,10 @@ public class FromRDBMSToHDFSTest extends ConnectorTestCase { // Assert correct output assertTo( - "1,'USA','San Francisco'", - "2,'USA','Sunnyvale'", - "3,'Czech Republic','Brno'", - "4,'USA','Palo Alto'" + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" ); // Clean up testing table http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java index b648870..1d09b82 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java @@ -37,11 +37,11 @@ public class TableStagedRDBMSTest extends ConnectorTestCase { final String stageTableName = "STAGE_" + getTableName(); createTableCities(); createFromFile("input-0001", - "1,'USA','San Francisco'", - "2,'USA','Sunnyvale'", - "3,'Czech Republic','Brno'", - "4,'USA','Palo Alto'" - ); + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); new Cities(provider, stageTableName).createTables(); // RDBMS link @@ -76,10 +76,10 @@ public class TableStagedRDBMSTest extends ConnectorTestCase { assertEquals(0L, provider.rowCount(stageTableName)); assertEquals(4L, rowCount()); - assertRowInCities(1, "USA", "San Francisco"); - assertRowInCities(2, "USA", "Sunnyvale"); - assertRowInCities(3, "Czech Republic", "Brno"); - assertRowInCities(4, "USA", "Palo Alto"); + assertRowInCities(1, "USA", "2004-10-23", "San Francisco"); + assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale"); + assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno"); + assertRowInCities(4, "USA", "2004-10-26", "Palo Alto"); // Clean up testing table provider.dropTable(stageTableName); http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java index 04d2835..8a09d7e 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java @@ -29,10 +29,10 @@ import org.testng.annotations.Test; public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { private static final String[] input = { - "1,'USA','San Francisco'", - "2,'USA','Sunnyvale'", - "3,'Czech Republic','Brno'", - "4,'USA','Palo Alto'" + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" }; @Test
