Updated Branches: refs/heads/trunk ab4bfb9a8 -> 549511378
SQOOP-906: Sqoop is always calling ConnectionManager.datetimeToQueryString with TIMESTAMP column type (Raghav Kumar Gautam via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/54951137 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/54951137 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/54951137 Branch: refs/heads/trunk Commit: 5495113781286de00473eb3d8c535f4288454082 Parents: ab4bfb9 Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Jul 15 10:34:38 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Jul 15 10:34:38 2013 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/manager/ConnManager.java | 5 + .../org/apache/sqoop/manager/OracleManager.java | 3 + src/java/org/apache/sqoop/tool/ImportTool.java | 3 +- .../cloudera/sqoop/TestIncrementalImport.java | 2 +- src/test/com/cloudera/sqoop/TestMerge.java | 4 +- .../com/cloudera/sqoop/ThirdPartyTests.java | 2 + .../sqoop/testutil/BaseSqoopTestCase.java | 9 +- .../oracle/OracleIncrementalImportTest.java | 187 +++++++++++++++++++ 8 files changed, 207 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/manager/ConnManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index c84c859..f4b22f9 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -721,6 +721,11 @@ public abstract class ConnManager { * be inserted into a SQL statement, representing that date/time. */ public String datetimeToQueryString(String datetime, int columnType) { + if (columnType != Types.TIMESTAMP && columnType != Types.DATE) { + String msg = "Column type is neither timestamp nor date!"; + LOG.error(msg); + throw new RuntimeException(msg); + } return "'" + datetime + "'"; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/manager/OracleManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java index 686bc19..f6f3afa 100644 --- a/src/java/org/apache/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/sqoop/manager/OracleManager.java @@ -593,6 +593,9 @@ public class OracleManager if (columnType == Types.TIMESTAMP) { return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')"; } else if (columnType == Types.DATE) { + // converting timestamp of the form 2012-11-11 11:11:11.00 to + // date of the form 2011:11:11 11:11:11 + datetime = datetime.split("\\.")[0]; return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')"; } else { String msg = "Column type is neither timestamp nor date!"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/java/org/apache/sqoop/tool/ImportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index cb800b6..fbbde1d 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -283,7 +283,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { } break; case DateLastModified: - checkColumnType = Types.TIMESTAMP; + checkColumnType = manager.getColumnTypes(options.getTableName(), + options.getSqlQuery()).get(options.getIncrementalTestColumn()); nextVal = manager.getCurrentDbTimestamp(); if (null == nextVal) { throw new IOException("Could not get current time from database"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/TestIncrementalImport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java index 02080df..8eadcdd 100644 --- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java +++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java @@ -424,7 +424,7 @@ public class TestIncrementalImport extends TestCase { args.add("--incremental"); args.add("lastmodified"); args.add("--check-column"); - args.add("last_modified"); + args.add("LAST_MODIFIED"); } args.add("--columns"); args.add("id"); http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/TestMerge.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestMerge.java b/src/test/com/cloudera/sqoop/TestMerge.java index 5010cf2..cc1a3a9 100644 --- a/src/test/com/cloudera/sqoop/TestMerge.java +++ b/src/test/com/cloudera/sqoop/TestMerge.java @@ -160,7 +160,7 @@ public class TestMerge extends BaseSqoopTestCase { // Do an import of this data into the "old" dataset. options.setTargetDir(new Path(warehouse, "merge-old").toString()); options.setIncrementalMode(IncrementalMode.DateLastModified); - options.setIncrementalTestColumn("lastmod"); + options.setIncrementalTestColumn("LASTMOD"); ImportTool importTool = new ImportTool(); Sqoop importer = new Sqoop(importTool, options.getConf(), options); @@ -204,7 +204,7 @@ public class TestMerge extends BaseSqoopTestCase { options.setNumMappers(1); options.setTargetDir(new Path(warehouse, "merge-new").toString()); options.setIncrementalMode(IncrementalMode.DateLastModified); - options.setIncrementalTestColumn("lastmod"); + options.setIncrementalTestColumn("LASTMOD"); options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString()); importTool = new ImportTool(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/ThirdPartyTests.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java index ada5c72..b2db1b4 100644 --- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java +++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java @@ -44,6 +44,7 @@ import com.cloudera.sqoop.manager.PostgresqlImportTest; import org.apache.sqoop.manager.mysql.MySqlCallExportTest; import org.apache.sqoop.manager.oracle.OracleCallExportTest; +import org.apache.sqoop.manager.oracle.OracleIncrementalImportTest; import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportDelimitedFileManualTest; import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeExportSequenceFileManualTest; import org.apache.sqoop.manager.sqlserver.SQLServerDatatypeImportDelimitedFileManualTest; @@ -82,6 +83,7 @@ public final class ThirdPartyTests extends TestCase { suite.addTestSuite(OracleExportTest.class); suite.addTestSuite(OracleManagerTest.class); suite.addTestSuite(OracleCompatTest.class); + suite.addTestSuite(OracleIncrementalImportTest.class); // SQL Server suite.addTestSuite(SQLServerDatatypeExportDelimitedFileManualTest.class); http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java index 877d7f8..793c23e 100644 --- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java @@ -306,8 +306,6 @@ public abstract class BaseSqoopTestCase extends TestCase { PreparedStatement statement = null; String createTableStr = null; String columnDefStr = ""; - String columnListStr = ""; - String valueListStr = ""; try { try { @@ -344,10 +342,13 @@ public abstract class BaseSqoopTestCase extends TestCase { } } - if (vals!=null) { + for (int count=0; vals != null && count < vals.length/colTypes.length; + ++count ) { + String columnListStr = ""; + String valueListStr = ""; for (int i = 0; i < colTypes.length; i++) { columnListStr += colNames[i]; - valueListStr += vals[i]; + valueListStr += vals[count * colTypes.length + i]; if (i < colTypes.length - 1) { columnListStr += ", "; valueListStr += ", "; http://git-wip-us.apache.org/repos/asf/sqoop/blob/54951137/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java b/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java new file mode 100644 index 0000000..3bbb1b1 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/oracle/OracleIncrementalImportTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.oracle; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.OracleUtils; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ImportJobTestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Writer; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +/** + * Test free form query import with the Oracle db. + */ +public class OracleIncrementalImportTest extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + OracleIncrementalImportTest.class.getName()); + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return OracleUtils.CONNECT_STRING; + } + + @Override + protected SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions opts = new SqoopOptions(conf); + OracleUtils.setOracleAuth(opts); + return opts; + } + + @Override + protected void dropTableIfExists(String table) throws SQLException { + OracleUtils.dropTable(table, getManager()); + } + + /** the names of the tables we're creating. */ + private List<String> tableNames; + + @Override + public void tearDown() { + // Clean up the database on our way out. + for (String tableName : tableNames) { + try { + dropTableIfExists(tableName); + } catch (SQLException e) { + LOG.warn("Error trying to drop table '" + tableName + + "' on tearDown: " + e); + } + } + super.tearDown(); + } + + /** + * Create the argv to pass to Sqoop. + * @param tableName tableName to be used. + * @param connPropsFileName connection properties to use + * @param checkColumnName name of the column to use for check-column + * @return the argv as an array of strings. + */ + protected String [] getArgv(String tableName, String connPropsFileName, + String checkColumnName) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--connect"); + args.add(getConnectString()); + args.add("--target-dir"); + args.add(getWarehouseDir()); + args.add("--num-mappers"); + args.add("1"); + args.add("--table"); + args.add(tableName); + args.add("--incremental"); + args.add("lastmodified"); + args.add("--check-column"); + args.add(checkColumnName); + args.add("--last-value"); + args.add("2000-01-01 01:01:01.0"); + args.add("--connection-param-file"); + args.add(connPropsFileName); + + return args.toArray(new String[0]); + } + + /** + * Create a tables with a date column. Run incremental import on the table + * with date column as check-column. + */ + public void testIncrementalImportWithLastModified() throws IOException { + tableNames = new ArrayList<String>(); + String [] types = { "INT", "VARCHAR(10)", "DATE", }; + String [] vals = { + "1", "'old_data'", + "TO_DATE('1999-01-01 11:11:11', 'YYYY-MM-DD HH24:MI:SS')", + "2", "'new_data'", + "TO_DATE('2000-11-11 23:23:23', 'YYYY-MM-DD HH24:MI:SS')", }; + String tableName = getTableName(); + tableNames.add(tableName); + createTableWithColTypes(types, vals); + // Some version of Oracle's jdbc drivers automatically convert date to + // timestamp. Since we don't want this to happen for this test, + // we must explicitly use a property file to control this behavior. + String connPropsFileName = "connection.properties"; + createFileWithContent(connPropsFileName, + "oracle.jdbc.mapDateToTimestamp=false"); + String[] args = getArgv(tableName, connPropsFileName, getColName(2)); + runImport(args); + + Path warehousePath = new Path(this.getWarehouseDir()); + Path filePath = new Path(warehousePath, "part-m-00000"); + String output = readLineFromPath(filePath); + String expectedVal = "2,new_data,2000-11-11"; + assertEquals("Incremental import result expected a different string", + expectedVal, output); + } + + private void createFileWithContent(String connPropsFileName, + String fileContent) throws IOException { + File file = new File(connPropsFileName); + if(file.exists()) + file.delete(); + Writer writer = new BufferedWriter(new FileWriter(connPropsFileName)); + writer.write(fileContent); + writer.close(); + } + + private String readLineFromPath(Path filePath) throws IOException { + BufferedReader reader = null; + if (!isOnPhysicalCluster()) { + reader = new BufferedReader(new InputStreamReader(new FileInputStream( + new File(filePath.toString())))); + } else { + FileSystem dfs = FileSystem.get(getConf()); + FSDataInputStream dis = dfs.open(filePath); + reader = new BufferedReader(new InputStreamReader(dis)); + } + String line = null; + try { + line = reader.readLine(); + } finally { + IOUtils.closeStream(reader); + } + return line; + } + +} +
