http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestFreeFormQueryImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestFreeFormQueryImport.java b/src/test/org/apache/sqoop/TestFreeFormQueryImport.java new file mode 100644 index 0000000..2df4352 --- /dev/null +++ b/src/test/org/apache/sqoop/TestFreeFormQueryImport.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test free form query import. + */ +public class TestFreeFormQueryImport extends ImportJobTestCase { + + private Log log; + + public TestFreeFormQueryImport() { + this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName()); + } + + /** + * @return the Log object to use for reporting during this test + */ + protected Log getLogger() { + return log; + } + + /** the names of the tables we're creating. */ + private List<String> tableNames; + + @After + public void tearDown() { + // Clean up the database on our way out. + for (String tableName : tableNames) { + try { + dropTableIfExists(tableName); + } catch (SQLException e) { + log.warn("Error trying to drop table '" + tableName + + "' on tearDown: " + e); + } + } + super.tearDown(); + } + + /** + * Create the argv to pass to Sqoop. + * @param splitByCol column of the table used to split work. + * @param query free form query to be used. + * @return the argv as an array of strings. + */ + protected String [] getArgv(String splitByCol, String query) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--connect"); + args.add(getConnectString()); + args.add("--target-dir"); + args.add(getWarehouseDir()); + args.add("--split-by"); + args.add(splitByCol); + args.add("--num-mappers"); + args.add("2"); + args.add("--query"); + args.add(query); + + return args.toArray(new String[0]); + } + + /** + * Create two tables that share the common id column. Run free-form query + * import on the result table that is created by joining the two tables on + * the id column. + */ + @Test + public void testSimpleJoin() throws IOException { + tableNames = new ArrayList<String>(); + + String [] types1 = { "SMALLINT", }; + String [] vals1 = { "1", }; + String tableName1 = getTableName(); + createTableWithColTypes(types1, vals1); + tableNames.add(tableName1); + + incrementTableNum(); + + String [] types2 = { "SMALLINT", "VARCHAR(32)", }; + String [] vals2 = { "1", "'foo'", }; + String tableName2 = getTableName(); + createTableWithColTypes(types2, vals2); + tableNames.add(tableName2); + + String query = "SELECT " + + tableName1 + "." + getColName(0) + ", " + + tableName2 + "." + getColName(1) + " " + + "FROM " + tableName1 + " JOIN " + tableName2 + " ON (" + + tableName1 + "." + getColName(0) + " = " + + tableName2 + "." + getColName(0) + ") WHERE " + + tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS"; + + runImport(getArgv(tableName1 + "." + getColName(0), query)); + + Path warehousePath = new Path(this.getWarehouseDir()); + Path filePath = new Path(warehousePath, "part-m-00000"); + String expectedVal = "1,foo"; + + BufferedReader reader = null; + if (!isOnPhysicalCluster()) { + reader = new BufferedReader( + new InputStreamReader(new FileInputStream( + new File(filePath.toString())))); + } else { + FileSystem dfs = FileSystem.get(getConf()); + FSDataInputStream dis = dfs.open(filePath); + reader = new BufferedReader(new InputStreamReader(dis)); + } + try { + String line = reader.readLine(); + assertEquals("QueryResult expected a different string", + expectedVal, line); + } finally { + IOUtils.closeStream(reader); + } + } +}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestIncrementalImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestIncrementalImport.java b/src/test/org/apache/sqoop/TestIncrementalImport.java new file mode 100644 index 0000000..1ab9802 --- /dev/null +++ b/src/test/org/apache/sqoop/TestIncrementalImport.java @@ -0,0 +1,1348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.sqoop.metastore.SavedJobsTestBase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; +import org.apache.sqoop.hive.HiveImport; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.manager.HsqldbManager; +import org.apache.sqoop.manager.ManagerFactory; +import org.apache.sqoop.metastore.JobData; +import org.apache.sqoop.testutil.BaseSqoopTestCase; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.tool.ImportTool; +import org.apache.sqoop.tool.JobTool; +import org.apache.sqoop.metastore.AutoGenericJobStorage; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + + +import static org.junit.Assert.*; + +/** + * Test the incremental import functionality. + * + * These all make use of the auto-connect hsqldb-based metastore. + * The metastore URL is configured to be in-memory, and drop all + * state between individual tests. + */ + +public class TestIncrementalImport { + + public static final Log LOG = LogFactory.getLog( + TestIncrementalImport.class.getName()); + + // What database do we read from. + public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:incremental"; + public static final String AUTO_STORAGE_PASSWORD = ""; + public static final String AUTO_STORAGE_USERNAME = "SA"; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void setUp() throws Exception { + resetSourceDataSchema(); + } + + public static void resetSourceDataSchema() throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + options.setUsername(AUTO_STORAGE_USERNAME); + options.setPassword(AUTO_STORAGE_PASSWORD); + SavedJobsTestBase.resetSchema(options); + } + + public static Configuration newConf() { + Configuration conf = new Configuration(); + conf.set(AutoGenericJobStorage.AUTO_STORAGE_USER_KEY, AUTO_STORAGE_USERNAME); + conf.set(AutoGenericJobStorage.AUTO_STORAGE_PASS_KEY, AUTO_STORAGE_PASSWORD); + conf.set(AutoGenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY, + SOURCE_DB_URL); + return conf; + } + + /** + * Assert that a table has a specified number of rows. + */ + private void assertRowCount(String table, int numRows) throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + ResultSet rs = null; + try { + s = c.prepareStatement("SELECT COUNT(*) FROM " + manager.escapeTableName(table)); + rs = s.executeQuery(); + if (!rs.next()) { + fail("No resultset"); + } + int realNumRows = rs.getInt(1); + assertEquals(numRows, realNumRows); + LOG.info("Expected " + numRows + " rows -- ok."); + } finally { + if (null != s) { + try { + s.close(); + } catch (SQLException sqlE) { + LOG.warn("exception: " + sqlE); + } + } + + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("exception: " + sqlE); + } + } + } + } + + /** + * Insert rows with id = [low, hi) into tableName. + */ + private void insertIdRows(String tableName, int low, int hi) + throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)"); + for (int i = low; i < hi; i++) { + s.setInt(1, i); + s.executeUpdate(); + } + + c.commit(); + } finally { + if(s != null) { + s.close(); + } + } + } + + /** + * Insert rows with id = [low, hi) into tableName with + * the timestamp column set to the specified ts. + */ + private void insertIdTimestampRows(String tableName, int low, int hi, + Timestamp ts) throws SQLException { + LOG.info("Inserting id rows in [" + low + ", " + hi + ") @ " + ts); + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?,?)"); + for (int i = low; i < hi; i++) { + s.setInt(1, i); + s.setTimestamp(2, ts); + s.executeUpdate(); + } + + c.commit(); + } finally { + s.close(); + } + } + + /** + * Insert rows with id = [low, hi) into tableName with + * id converted to string. + */ + private void insertIdVarcharRows(String tableName, int low, int hi) + throws SQLException { + LOG.info("Inserting rows in [" + low + ", " + hi + ")"); + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("INSERT INTO " + manager.escapeTableName(tableName) + " VALUES(?)"); + for (int i = low; i < hi; i++) { + s.setString(1, Integer.toString(i)); + s.executeUpdate(); + } + c.commit(); + } finally { + s.close(); + } + } + + /** + * Create a table with an 'id' column full of integers. + */ + private void createIdTable(String tableName, int insertRows) + throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL)"); + s.executeUpdate(); + c.commit(); + insertIdRows(tableName, 0, insertRows); + } finally { + s.close(); + } + } + + /** + * Create a table with an 'id' column full of integers and a + * last_modified column with timestamps. + */ + private void createTimestampTable(String tableName, int insertRows, + Timestamp baseTime) throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id INT NOT NULL, " + + "last_modified TIMESTAMP)"); + s.executeUpdate(); + c.commit(); + insertIdTimestampRows(tableName, 0, insertRows, baseTime); + } finally { + s.close(); + } + } + + /** + * Create a table with an 'id' column of type varchar(20) + */ + private void createIdVarcharTable(String tableName, + int insertRows) throws SQLException { + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("CREATE TABLE " + manager.escapeTableName(tableName) + "(id varchar(20) NOT NULL)"); + s.executeUpdate(); + c.commit(); + insertIdVarcharRows(tableName, 0, insertRows); + } finally { + s.close(); + } + } + + /** + * Delete all files in a directory for a table. + */ + public void clearDir(String tableName) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + fs.delete(tableDir, true); + } catch (Exception e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + /** + * Look at a directory that should contain files full of an imported 'id' + * column. Assert that all numbers in [0, expectedNums) are present + * in order. + */ + public void assertDirOfNumbers(String tableName, int expectedNums) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] fileNames = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + fileNames[i] = stats[i].getPath().toString(); + } + + Arrays.sort(fileNames); + + // Read all the files in sorted order, adding the value lines to the list. + List<String> receivedNums = new ArrayList<String>(); + for (String fileName : fileNames) { + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(fileName)))); + try { + while (true) { + String s = r.readLine(); + if (null == s) { + break; + } + + receivedNums.add(s.trim()); + } + } finally { + r.close(); + } + } + + assertEquals(expectedNums, receivedNums.size()); + + // Compare the received values with the expected set. + for (int i = 0; i < expectedNums; i++) { + assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i))); + } + } catch (Exception e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + /** + * Look at a directory that should contain files full of an imported 'id' + * column and 'last_modified' column. Assert that all numbers in [0, expectedNums) are present + * in order. + */ + public void assertDirOfNumbersAndTimestamps(String tableName, int expectedNums) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] fileNames = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + fileNames[i] = stats[i].getPath().toString(); + } + + Arrays.sort(fileNames); + + // Read all the files in sorted order, adding the value lines to the list. + List<String> receivedNums = new ArrayList<String>(); + for (String fileName : fileNames) { + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(fileName)))); + try { + while (true) { + String s = r.readLine(); + if (null == s) { + break; + } + + receivedNums.add(s.trim()); + } + } finally { + r.close(); + } + } + + assertEquals(expectedNums, receivedNums.size()); + + // Compare the received values with the expected set. + for (int i = 0; i < expectedNums; i++) { + assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i).split(",")[0])); + } + } catch (Exception e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + /** + * Assert that a directory contains a file with exactly one line + * in it, containing the prescribed number 'val'. + */ + public void assertFirstSpecificNumber(String tableName, int val) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] filePaths = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + filePaths[i] = stats[i].getPath().toString(); + } + + // Read the first file that is not a hidden file. + boolean foundVal = false; + for (String filePath : filePaths) { + String fileName = new Path(filePath).getName(); + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + if (foundVal) { + // Make sure we don't have two or more "real" files in the dir. + fail("Got an extra data-containing file in this directory."); + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(filePath)))); + try { + String s = r.readLine(); + if (null == s) { + fail("Unexpected empty file " + filePath + "."); + } + assertEquals(val, (int) Integer.valueOf(s.trim())); + + String nextLine = r.readLine(); + if (nextLine != null) { + fail("Expected only one result, but got another line: " + nextLine); + } + + // Successfully got the value we were looking for. + foundVal = true; + } finally { + r.close(); + } + } + } catch (IOException e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + /** + * Assert that a directory contains a file with exactly one line + * in it, containing the prescribed number 'val'. + */ + public void assertSpecificNumber(String tableName, int val) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] filePaths = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + filePaths[i] = stats[i].getPath().toString(); + } + + // Read the first file that is not a hidden file. + boolean foundVal = false; + for (String filePath : filePaths) { + String fileName = new Path(filePath).getName(); + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + if (foundVal) { + // Make sure we don't have two or more "real" files in the dir. + fail("Got an extra data-containing file in this directory."); + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(filePath)))); + try { + String s = r.readLine(); + if (val == (int) Integer.valueOf(s.trim().split(",")[0])) { + if (foundVal) { + fail("Expected only one result, but got another line: " + s); + } + foundVal = true; + } + } finally { + r.close(); + } + } + } catch (IOException e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + public void runImport(SqoopOptions options, List<String> args) { + try { + Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options); + int ret = Sqoop.runSqoop(importer, args.toArray(new String[0])); + assertEquals("Failure during job", 0, ret); + } catch (Exception e) { + LOG.error("Got exception running Sqoop: " + + StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + + /** + * Return a list of arguments to import the specified table. + */ + private List<String> getArgListForTable(String tableName, boolean commonArgs, + boolean isAppend) { + return getArgListForTable(tableName, commonArgs, isAppend, false); + } + + /** + * Return a list of arguments to import the specified table. + */ + private List<String> getArgListForTable(String tableName, boolean commonArgs, + boolean isAppend, boolean appendTimestamp) { + List<String> args = new ArrayList<String>(); + if (commonArgs) { + CommonArgs.addHadoopFlags(args); + } + args.add("--connect"); + args.add(SOURCE_DB_URL); + args.add("--table"); + args.add(tableName); + args.add("--warehouse-dir"); + args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + if (isAppend) { + args.add("--incremental"); + args.add("append"); + if (!appendTimestamp) { + args.add("--check-column"); + args.add("ID"); + } else { + args.add("--check-column"); + args.add("LAST_MODIFIED"); + } + } else { + args.add("--incremental"); + args.add("lastmodified"); + args.add("--check-column"); + args.add("LAST_MODIFIED"); + } + args.add("--columns"); + args.add("ID"); + args.add("-m"); + args.add("1"); + + return args; + } + + /** + * Return list of arguments to import by query. + * @return + */ + private List<String> getArgListForQuery(String query, String directoryName, + boolean commonArgs, boolean isAppend, boolean appendTimestamp) { + List<String> args = new ArrayList<String>(); + if (commonArgs) { + CommonArgs.addHadoopFlags(args); + } + + String [] directoryNames = directoryName.split("/"); + String className = directoryNames[directoryNames.length -1]; + + args.add("--connect"); + args.add(SOURCE_DB_URL); + args.add("--query"); + args.add(query); + args.add("--class-name"); + args.add(className); + args.add("--target-dir"); + args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR + + System.getProperty("file.separator") + directoryName); + if (isAppend) { + args.add("--incremental"); + args.add("append"); + if (!appendTimestamp) { + args.add("--check-column"); + args.add("ID"); + } else { + args.add("--check-column"); + args.add("LAST_MODIFIED"); + } + } else { + args.add("--incremental"); + args.add("lastmodified"); + args.add("--check-column"); + args.add("LAST_MODIFIED"); + } + args.add("-m"); + args.add("1"); + + return args; + } + /** + * Create a job with the specified name, where the job performs + * an import configured with 'jobArgs'. + */ + private void createJob(String jobName, List<String> jobArgs) { + createJob(jobName, jobArgs, newConf()); + } + + /** + * Create a job with the specified name, where the job performs + * an import configured with 'jobArgs', using the provided configuration + * as defaults. + */ + private void createJob(String jobName, List<String> jobArgs, + Configuration conf) { + try { + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + Sqoop makeJob = new Sqoop(new JobTool(), conf, options); + + List<String> args = new ArrayList<String>(); + args.add("--create"); + args.add(jobName); + args.add("--"); + args.add("import"); + args.addAll(jobArgs); + + int ret = Sqoop.runSqoop(makeJob, args.toArray(new String[0])); + assertEquals("Failure to create job", 0, ret); + } catch (Exception e) { + LOG.error("Got exception running Sqoop to create job: " + + StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + + /** + * Run the specified job. + */ + private void runJob(String jobName) { + runJob(jobName, newConf()); + } + + /** + * Run the specified job. + */ + private void runJob(String jobName, Configuration conf) { + try { + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + Sqoop runJob = new Sqoop(new JobTool(), conf, options); + + List<String> args = new ArrayList<String>(); + args.add("--exec"); + args.add(jobName); + + int ret = Sqoop.runSqoop(runJob, args.toArray(new String[0])); + assertEquals("Failure to run job", 0, ret); + } catch (Exception e) { + LOG.error("Got exception running Sqoop to run job: " + + StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + + // Incremental import of an empty table, no metastore. + @Test + public void testEmptyAppendImport() throws Exception { + final String TABLE_NAME = "emptyAppend1"; + createIdTable(TABLE_NAME, 0); + List<String> args = getArgListForTable(TABLE_NAME, true, true); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 0); + } + + // Incremental import of a filled table, no metastore. + @Test + public void testFullAppendImport() throws Exception { + final String TABLE_NAME = "fullAppend1"; + createIdTable(TABLE_NAME, 10); + List<String> args = getArgListForTable(TABLE_NAME, true, true); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 10); + } + + @Test + public void testEmptyJobAppend() throws Exception { + // Create a job and run an import on an empty table. + // Nothing should happen. + + final String TABLE_NAME = "emptyJob"; + createIdTable(TABLE_NAME, 0); + + List<String> args = getArgListForTable(TABLE_NAME, false, true); + createJob("emptyJob", args); + runJob("emptyJob"); + assertDirOfNumbers(TABLE_NAME, 0); + + // Running the job a second time should result in + // nothing happening, it's still empty. + runJob("emptyJob"); + assertDirOfNumbers(TABLE_NAME, 0); + } + + @Test + public void testEmptyThenFullJobAppend() throws Exception { + // Create an empty table. Import it; nothing happens. + // Add some rows. Verify they are appended. + + final String TABLE_NAME = "emptyThenFull"; + createIdTable(TABLE_NAME, 0); + + List<String> args = getArgListForTable(TABLE_NAME, false, true); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 0); + + // Now add some rows. + insertIdRows(TABLE_NAME, 0, 10); + + // Running the job a second time should import 10 rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + insertIdRows(TABLE_NAME, 10, 20); + + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + @Test + public void testEmptyThenFullJobAppendWithQuery() throws Exception { + // Create an empty table. Import it; nothing happens. + // Add some rows. Verify they are appended. + + final String TABLE_NAME = "withQuery"; + createIdTable(TABLE_NAME, 0); + clearDir(TABLE_NAME); + + final String QUERY = "SELECT id FROM \"withQuery\" WHERE $CONDITIONS"; + + List<String> args = getArgListForQuery(QUERY, TABLE_NAME, + false, true, false); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 0); + + // Now add some rows. + insertIdRows(TABLE_NAME, 0, 10); + + // Running the job a second time should import 10 rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + insertIdRows(TABLE_NAME, 10, 20); + + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + @Test + public void testAppend() throws Exception { + // Create a table with data in it; import it. + // Then add more data, verify that only the incremental data is pulled. + + final String TABLE_NAME = "append"; + createIdTable(TABLE_NAME, 10); + + List<String> args = getArgListForTable(TABLE_NAME, false, true); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + insertIdRows(TABLE_NAME, 10, 20); + + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + @Test + public void testEmptyLastModified() throws Exception { + final String TABLE_NAME = "emptyLastModified"; + createTimestampTable(TABLE_NAME, 0, null); + List<String> args = getArgListForTable(TABLE_NAME, true, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 0); + } + + @Test + public void testEmptyLastModifiedWithNonExistingParentDirectory() throws Exception { + final String TABLE_NAME = "emptyLastModifiedNoParent"; + final String QUERY = "SELECT id, last_modified FROM \"" + TABLE_NAME + "\" WHERE $CONDITIONS"; + final String DIRECTORY = "non-existing/parents/" + TABLE_NAME; + createTimestampTable(TABLE_NAME, 0, null); + List<String> args = getArgListForQuery(QUERY, DIRECTORY, true, false, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(DIRECTORY, 0); + } + + @Test + public void testFullLastModifiedImport() throws Exception { + // Given a table of rows imported in the past, + // see that they are imported. + final String TABLE_NAME = "fullLastModified"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List<String> args = getArgListForTable(TABLE_NAME, true, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 10); + } + + @Test + public void testNoImportFromTheFuture() throws Exception { + // If last-modified dates for writes are serialized to be in the + // future w.r.t. an import, do not import these rows. + + final String TABLE_NAME = "futureLastModified"; + Timestamp theFuture = new Timestamp(System.currentTimeMillis() + 1000000); + createTimestampTable(TABLE_NAME, 10, theFuture); + + List<String> args = getArgListForTable(TABLE_NAME, true, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + + assertDirOfNumbers(TABLE_NAME, 0); + } + + @Test + public void testEmptyJobLastMod() throws Exception { + // Create a job and run an import on an empty table. + // Nothing should happen. + + final String TABLE_NAME = "emptyJobLastMod"; + createTimestampTable(TABLE_NAME, 0, null); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createJob("emptyJobLastMod", args); + runJob("emptyJobLastMod"); + assertDirOfNumbers(TABLE_NAME, 0); + + // Running the job a second time should result in + // nothing happening, it's still empty. + runJob("emptyJobLastMod"); + assertDirOfNumbers(TABLE_NAME, 0); + } + + @Test + public void testEmptyThenFullJobLastMod() throws Exception { + // Create an empty table. Import it; nothing happens. + // Add some rows. Verify they are appended. + + final String TABLE_NAME = "emptyThenFullTimestamp"; + createTimestampTable(TABLE_NAME, 0, null); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 0); + + long importWasBefore = System.currentTimeMillis(); + + // Let some time elapse. + Thread.sleep(50); + + long rowsAddedTime = System.currentTimeMillis() - 5; + + // Check: we are adding rows after the previous import time + // and before the current time. + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + + insertIdTimestampRows(TABLE_NAME, 0, 10, new Timestamp(rowsAddedTime)); + + // Running the job a second time should import 10 rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime)); + + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + @Test + public void testAppendWithTimestamp() throws Exception { + // Create a table with data in it; import it. + // Then add more data, verify that only the incremental data is pulled. + + final String TABLE_NAME = "appendTimestamp"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + insertIdTimestampRows(TABLE_NAME, 10, 20, new Timestamp(rowsAddedTime)); + + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + @Test + public void testAppendWithString() throws Exception { + // Create a table with string column in it; + // incrementally import it on the string column - it should fail. + + final String TABLE_NAME = "appendString"; + createIdVarcharTable(TABLE_NAME, 10); + + List<String> args = getArgListForTable(TABLE_NAME, false, true); + args.add("--append"); + createJob(TABLE_NAME, args); + + thrown.expect(RuntimeException.class); + thrown.reportMissingExceptionWithMessage("Expected incremental import on varchar column to fail"); + runJob(TABLE_NAME); + } + + @Test + public void testModifyWithTimestamp() throws Exception { + // Create a table with data in it; import it. + // Then modify some existing rows, and verify that we only grab + // those rows. + + final String TABLE_NAME = "modifyTimestamp"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options = new SqoopOptions(); + options.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Import only the new row. + clearDir(TABLE_NAME); + runJob(TABLE_NAME); + assertFirstSpecificNumber(TABLE_NAME, 4000); + } + @Test + public void testUpdateModifyWithTimestamp() throws Exception { + // Create a table with data in it; import it. + // Then modify some existing rows, and verify that we only grab + // those rows. + + final String TABLE_NAME = "updateModifyTimestamp"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertDirOfNumbers(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options2 = new SqoopOptions(); + options2.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options2); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Update the new row. + args.add("--last-value"); + args.add(new Timestamp(importWasBefore).toString()); + args.add("--merge-key"); + args.add("ID"); + conf = newConf(); + options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertSpecificNumber(TABLE_NAME, 4000); + } + + @Test + public void testUpdateModifyWithTimestampWithQuery() throws Exception { + // Create an empty table. Import it; nothing happens. + // Add some rows. Verify they are appended. + + final String TABLE_NAME = "UpdateModifyWithTimestampWithQuery"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + final String QUERY = "SELECT id, last_modified FROM \"UpdateModifyWithTimestampWithQuery\" WHERE $CONDITIONS"; + + List<String> args = getArgListForQuery(QUERY, TABLE_NAME, + true, false, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertDirOfNumbersAndTimestamps(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options2 = new SqoopOptions(); + options2.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options2); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Update the new row. + args.add("--last-value"); + args.add(new Timestamp(importWasBefore).toString()); + args.add("--merge-key"); + args.add("ID"); + conf = newConf(); + options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertSpecificNumber(TABLE_NAME, 4000); + } + + @Test + public void testUpdateModifyWithTimestampJob() throws Exception { + // Create a table with data in it; import it. + // Then modify some existing rows, and verify that we only grab + // those rows. + + final String TABLE_NAME = "updateModifyTimestampJob"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + args.add("--merge-key"); + args.add("ID"); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options2 = new SqoopOptions(); + options2.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options2); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + manager.escapeTableName(TABLE_NAME) + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Update the new row. + runJob(TABLE_NAME); + assertSpecificNumber(TABLE_NAME, 4000); + } + + /** + * ManagerFactory returning an HSQLDB ConnManager which allows you to + * specify the current database timestamp. + */ + public static class InstrumentHsqldbManagerFactory extends ManagerFactory { + @Override + public ConnManager accept(JobData data) { + LOG.info("Using instrumented manager"); + return new InstrumentHsqldbManager(data.getSqoopOptions()); + } + } + + /** + * Hsqldb ConnManager that lets you set the current reported timestamp + * from the database, to allow testing of boundary conditions for imports. + */ + public static class InstrumentHsqldbManager extends HsqldbManager { + private static Timestamp curTimestamp; + + public InstrumentHsqldbManager(SqoopOptions options) { + super(options); + } + + @Override + public Timestamp getCurrentDbTimestamp() { + return InstrumentHsqldbManager.curTimestamp; + } + + public static void setCurrentDbTimestamp(Timestamp t) { + InstrumentHsqldbManager.curTimestamp = t; + } + } + + @Test + public void testTimestampBoundary() throws Exception { + // Run an import, and then insert rows with the last-modified timestamp + // set to the exact time when the first import runs. Run a second import + // and ensure that we pick up the new data. + + long now = System.currentTimeMillis(); + + final String TABLE_NAME = "boundaryTimestamp"; + Timestamp thePast = new Timestamp(now - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + Timestamp firstJobTime = new Timestamp(now); + InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime); + + // Configure the job to use the instrumented Hsqldb manager. + Configuration conf = newConf(); + conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY, + InstrumentHsqldbManagerFactory.class.getName()); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + args.add("--append"); + createJob(TABLE_NAME, args, conf); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows with the timestamp equal to the job run timestamp. + insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime); + assertRowCount(TABLE_NAME, 20); + + // Run a second job with the clock advanced by 100 ms. + Timestamp secondJobTime = new Timestamp(now + 100); + InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime); + + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + @Test + public void testIncrementalAppendTimestamp() throws Exception { + // Run an import, and then insert rows with the last-modified timestamp + // set to the exact time when the first import runs. Run a second import + // and ensure that we pick up the new data. + + long now = System.currentTimeMillis(); + + final String TABLE_NAME = "incrementalAppendTimestamp"; + Timestamp thePast = new Timestamp(now - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + Timestamp firstJobTime = new Timestamp(now); + InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime); + + // Configure the job to use the instrumented Hsqldb manager. + Configuration conf = newConf(); + conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY, + InstrumentHsqldbManagerFactory.class.getName()); + + List<String> args = getArgListForTable(TABLE_NAME, false, true, true); + createJob(TABLE_NAME, args, conf); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Add some more rows with the timestamp equal to the job run timestamp. + insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime); + assertRowCount(TABLE_NAME, 20); + + // Run a second job with the clock advanced by 100 ms. + Timestamp secondJobTime = new Timestamp(now + 100); + InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime); + + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + @Test + public void testIncrementalHiveAppendEmptyThenFull() throws Exception { + // This is to test Incremental Hive append feature. SQOOP-2470 + final String TABLE_NAME = "incrementalHiveAppendEmptyThenFull"; + Configuration conf = newConf(); + conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY, + InstrumentHsqldbManagerFactory.class.getName()); + clearDir(TABLE_NAME); + createIdTable(TABLE_NAME, 0); + List<String> args = new ArrayList<String>(); + args.add("--connect"); + args.add(SOURCE_DB_URL); + args.add("--table"); + args.add(TABLE_NAME); + args.add("--warehouse-dir"); + args.add(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + args.add("--hive-import"); + args.add("--hive-table"); + args.add(TABLE_NAME + "hive"); + args.add("--incremental"); + args.add("append"); + args.add("--check-column"); + args.add("ID"); + args.add("-m"); + args.add("1"); + createJob(TABLE_NAME, args, conf); + HiveImport.setTestMode(true); + String hiveHome = org.apache.sqoop.SqoopOptions.getHiveHomeDefault(); + assertNotNull("hive.home was not set", hiveHome); + String testDataPath = new Path(new Path(hiveHome), "scripts/" + + "incrementalHiveAppendEmpty.q").toString(); + System.clearProperty("expected.script"); + System.setProperty("expected.script", + new File(testDataPath).getAbsolutePath()); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 0); + // Now add some rows. + insertIdRows(TABLE_NAME, 0, 10); + String testDataPath10 = new Path(new Path(hiveHome), "scripts/" + + "incrementalHiveAppend10.q").toString(); + System.clearProperty("expected.script"); + System.setProperty("expected.script", + new File(testDataPath10).getAbsolutePath()); + System.getProperty("expected.script"); + // Running the job a second time should import 10 rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + // Add some more rows. + insertIdRows(TABLE_NAME, 10, 20); + String testDataPath20 = new Path(new Path(hiveHome), "scripts/" + + "incrementalHiveAppend20.q").toString(); + System.clearProperty("expected.script"); + System.setProperty("expected.script", + new File(testDataPath20).getAbsolutePath()); + // Import only those rows. + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 20); + } + + // SQOOP-1890 + @Test + public void testTableNameWithSpecialCharacters() throws Exception { + // Table name with special characters to verify proper table name escaping + final String TABLE_NAME = "my-table.ext"; + createIdTable(TABLE_NAME, 0); + + // Now add some rows. + insertIdRows(TABLE_NAME, 0, 10); + + List<String> args = getArgListForTable(TABLE_NAME, false, true); + createJob("emptyJob", args); + runJob("emptyJob"); + assertDirOfNumbers(TABLE_NAME, 10); + } + +} + http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMerge.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestMerge.java b/src/test/org/apache/sqoop/TestMerge.java new file mode 100644 index 0000000..8eef8d4 --- /dev/null +++ b/src/test/org/apache/sqoop/TestMerge.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.HsqldbTestServer; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.testutil.BaseSqoopTestCase; +import org.apache.sqoop.tool.CodeGenTool; +import org.apache.sqoop.tool.ImportTool; +import org.apache.sqoop.tool.MergeTool; +import org.apache.sqoop.util.ClassLoaderStack; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.Datasets; + +import static org.apache.avro.generic.GenericData.Record; +import static org.junit.Assert.fail; + +/** + * Test that the merge tool works. + */ +public class TestMerge extends BaseSqoopTestCase { + + private static final Log LOG = + LogFactory.getLog(TestMerge.class.getName()); + + protected ConnManager manager; + protected Connection conn; + + public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge"; + + private static final List<List<Integer>> initRecords = Arrays + .asList(Arrays.asList(new Integer(0), new Integer(0)), + Arrays.asList(new Integer(1), new Integer(42))); + + private static final List<List<Integer>> newRecords = Arrays.asList( + Arrays.asList(new Integer(1), new Integer(43)), + Arrays.asList(new Integer(3), new Integer(313))); + + private static final List<List<Integer>> mergedRecords = Arrays.asList( + Arrays.asList(new Integer(0), new Integer(0)), + Arrays.asList(new Integer(1), new Integer(43)), + Arrays.asList(new Integer(3), new Integer(313))); + + @Before + public void setUp() { + super.setUp(); + manager = getManager(); + try { + conn = manager.getConnection(); + } catch (SQLException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public static final String TABLE_NAME = "MergeTable"; + private static final String OLD_PATH = "merge_old"; + private static final String NEW_PATH = "merge_new"; + private static final String FINAL_PATH = "merge_final"; + + public Configuration newConf() { + Configuration conf = new Configuration(); + if (!BaseSqoopTestCase.isOnPhysicalCluster()) { + conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); + } + conf.set("mapred.job.tracker", "local"); + return conf; + } + + /** + * Create a SqoopOptions to connect to the manager. + */ + public SqoopOptions getSqoopOptions(Configuration conf) { + SqoopOptions options = new SqoopOptions(conf); + options.setConnectString(HsqldbTestServer.getDbUrl()); + + return options; + } + + protected void createTable(List<List<Integer>> records) throws SQLException { + PreparedStatement s = conn.prepareStatement("DROP TABLE \"" + TABLE_NAME + "\" IF EXISTS"); + try { + s.executeUpdate(); + } finally { + s.close(); + } + + s = conn.prepareStatement("CREATE TABLE \"" + TABLE_NAME + + "\" (id INT NOT NULL PRIMARY KEY, val INT, LASTMOD timestamp)"); + try { + s.executeUpdate(); + } finally { + s.close(); + } + + for (List<Integer> record : records) { + final String values = StringUtils.join(record, ", "); + s = conn + .prepareStatement("INSERT INTO \"" + TABLE_NAME + "\" VALUES (" + values + ", now())"); + try { + s.executeUpdate(); + } finally { + s.close(); + } + } + + conn.commit(); + } + + @Test + public void testTextFileMerge() throws Exception { + runMergeTest(SqoopOptions.FileLayout.TextFile); + } + + @Test + public void testAvroFileMerge() throws Exception { + runMergeTest(SqoopOptions.FileLayout.AvroDataFile); + } + + @Test + public void testParquetFileMerge() throws Exception { + runMergeTest(SqoopOptions.FileLayout.ParquetFile); + } + + public void runMergeTest(SqoopOptions.FileLayout fileLayout) throws Exception { + createTable(initRecords); + + // Create a jar to use for the merging process; we'll load it + // into the current thread CL for when this runs. This needs + // to contain a different class name than used for the imports + // due to classloaderstack issues in the same JVM. + final String MERGE_CLASS_NAME = "ClassForMerging"; + SqoopOptions options = getSqoopOptions(newConf()); + options.setTableName(TABLE_NAME); + options.setClassName(MERGE_CLASS_NAME); + + CodeGenTool codeGen = new CodeGenTool(); + Sqoop codeGenerator = new Sqoop(codeGen, options.getConf(), options); + int ret = Sqoop.runSqoop(codeGenerator, new String[0]); + if (0 != ret) { + fail("Nonzero exit from codegen: " + ret); + } + + List<String> jars = codeGen.getGeneratedJarFiles(); + String jarFileName = jars.get(0); + + // Now do the imports. + importData(OLD_PATH, fileLayout); + + // Check that we got records that meet our expected values. + checkData(OLD_PATH, initRecords, fileLayout); + + Thread.sleep(25); + + // Modify the data in the warehouse. + createTable(newRecords); + + Thread.sleep(25); + + // Do another import, into the "new" dir. + importData(NEW_PATH, fileLayout); + + checkData(NEW_PATH, newRecords, fileLayout); + + // Now merge the results! + ClassLoaderStack.addJarFile(jarFileName, MERGE_CLASS_NAME); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + options = getSqoopOptions(newConf()); + options.setMergeOldPath(new Path(warehouse, OLD_PATH).toString()); + options.setMergeNewPath(new Path(warehouse, NEW_PATH).toString()); + options.setMergeKeyCol("ID"); + options.setTargetDir(new Path(warehouse, FINAL_PATH).toString()); + options.setClassName(MERGE_CLASS_NAME); + options.setExistingJarName(jarFileName); + + MergeTool mergeTool = new MergeTool(); + Sqoop merger = new Sqoop(mergeTool, options.getConf(), options); + ret = Sqoop.runSqoop(merger, new String[0]); + if (0 != ret) { + fail("Merge failed with exit code " + ret); + } + + checkData(FINAL_PATH, mergedRecords, fileLayout); + } + + private void checkData(String dataDir, List<List<Integer>> records, + SqoopOptions.FileLayout fileLayout) throws Exception { + for (List<Integer> record : records) { + assertRecordStartsWith(record, dataDir, fileLayout); + } + } + + private boolean valueMatches(GenericRecord genericRecord, List<Integer> recordVals) { + return recordVals.get(0).equals(genericRecord.get(0)) + && recordVals.get(1).equals(genericRecord.get(1)); + } + + private void importData(String targetDir, SqoopOptions.FileLayout fileLayout) { + SqoopOptions options; + options = getSqoopOptions(newConf()); + options.setTableName(TABLE_NAME); + options.setNumMappers(1); + options.setFileLayout(fileLayout); + options.setDeleteMode(true); + + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + options.setTargetDir(new Path(warehouse, targetDir).toString()); + + ImportTool importTool = new ImportTool(); + Sqoop importer = new Sqoop(importTool, options.getConf(), options); + int ret = Sqoop.runSqoop(importer, new String[0]); + if (0 != ret) { + fail("Initial import failed with exit code " + ret); + } + } + + /** + * @return true if the file specified by path 'p' contains a line + * that starts with 'prefix' + */ + protected boolean checkTextFileForLine(FileSystem fs, Path p, List<Integer> record) + throws IOException { + final String prefix = StringUtils.join(record, ','); + BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p))); + try { + while (true) { + String in = r.readLine(); + if (null == in) { + break; // done with the file. + } + + if (in.startsWith(prefix)) { + return true; + } + } + } finally { + r.close(); + } + + return false; + } + + private boolean checkAvroFileForLine(FileSystem fs, Path p, List<Integer> record) + throws IOException { + SeekableInput in = new FsInput(p, new Configuration()); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); + FileReader<GenericRecord> reader = DataFileReader.openReader(in, datumReader); + reader.sync(0); + + while (reader.hasNext()) { + if (valueMatches(reader.next(), record)) { + return true; + } + } + + return false; + } + + private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException + { + Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class); + DatasetReader<Record> datasetReader = null; + try { + datasetReader = parquetRecords.newReader(); + for (GenericRecord genericRecord : datasetReader) { + if (valueMatches(genericRecord, record)) { + return true; + } + } + } + finally { + if (datasetReader != null) { + datasetReader.close(); + } + } + + return false; + } + + protected boolean checkFileForLine(FileSystem fs, Path p, SqoopOptions.FileLayout fileLayout, + List<Integer> record) throws IOException { + boolean result = false; + switch (fileLayout) { + case TextFile: + result = checkTextFileForLine(fs, p, record); + break; + case AvroDataFile: + result = checkAvroFileForLine(fs, p, record); + break; + case ParquetFile: + result = checkParquetFileForLine(fs, p, record); + break; + } + return result; + } + + /** + * Return true if there's a file in 'dirName' with a line that starts with + * 'prefix'. + */ + protected boolean recordStartsWith(List<Integer> record, String dirName, + SqoopOptions.FileLayout fileLayout) + throws Exception { + Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR); + Path targetPath = new Path(warehousePath, dirName); + + FileSystem fs = FileSystem.getLocal(new Configuration()); + FileStatus [] files = fs.listStatus(targetPath); + + if (null == files || files.length == 0) { + fail("Got no import files!"); + } + + for (FileStatus stat : files) { + Path p = stat.getPath(); + if (p.getName().startsWith("part-") || p.getName().endsWith(".parquet")) { + if (checkFileForLine(fs, p, fileLayout, record)) { + // We found the line. Nothing further to do. + return true; + } + } + } + + return false; + } + + protected void assertRecordStartsWith(List<Integer> record, String dirName, + SqoopOptions.FileLayout fileLayout) throws Exception { + if (!recordStartsWith(record, dirName, fileLayout)) { + fail("No record found that starts with [" + StringUtils.join(record, ", ") + "] in " + dirName); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMultiCols.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestMultiCols.java b/src/test/org/apache/sqoop/TestMultiCols.java new file mode 100644 index 0000000..1c932e9 --- /dev/null +++ b/src/test/org/apache/sqoop/TestMultiCols.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.junit.Test; + +/** + * Test cases that import rows containing multiple columns, + * some of which may contain null values. + * + * Also test loading only selected columns from the db. + */ +public class TestMultiCols extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + TestMultiCols.class.getName()); + + /** + * Do a full import verification test on a table containing one row. + * @param types the types of the columns to insert + * @param insertVals the SQL text to use to insert each value + * @param validateLine the text to expect as a toString() of the entire row, + * as imported by the tool + */ + private void verifyTypes(String [] types , String [] insertVals, + String validateLine) { + verifyTypes(types, insertVals, validateLine, null); + } + + private void verifyTypes(String [] types , String [] insertVals, + String validateLine, String [] importColumns) { + + createTableWithColTypes(types, insertVals); + verifyImport(validateLine, importColumns); + LOG.debug("Verified input line as " + validateLine + " -- ok!"); + } + + @Test + public void testThreeStrings() { + String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" }; + String [] insertVals = { "'foo'", "'bar'", "'baz'" }; + String validateLine = "foo,bar,baz"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testStringsWithNull1() { + String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" }; + String [] insertVals = { "'foo'", "null", "'baz'" }; + String validateLine = "foo,null,baz"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testStringsWithNull2() { + String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" }; + String [] insertVals = { "null", "'foo'", "'baz'" }; + String validateLine = "null,foo,baz"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testStringsWithNull3() { + String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" }; + String [] insertVals = { "'foo'", "'baz'", "null"}; + String validateLine = "foo,baz,null"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testThreeInts() { + String [] types = { "INTEGER", "INTEGER", "INTEGER" }; + String [] insertVals = { "1", "2", "3" }; + String validateLine = "1,2,3"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testIntsWithNulls() { + String [] types = { "INTEGER", "INTEGER", "INTEGER" }; + String [] insertVals = { "1", "null", "3" }; + String validateLine = "1,null,3"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testMixed1() { + String [] types = { "INTEGER", "VARCHAR(32)", "DATE" }; + String [] insertVals = { "1", "'meep'", "'2009-12-31'" }; + String validateLine = "1,meep,2009-12-31"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testMixed2() { + String [] types = { "INTEGER", "VARCHAR(32)", "DATE" }; + String [] insertVals = { "null", "'meep'", "'2009-12-31'" }; + String validateLine = "null,meep,2009-12-31"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testMixed3() { + String [] types = { "INTEGER", "VARCHAR(32)", "DATE" }; + String [] insertVals = { "1", "'meep'", "null" }; + String validateLine = "1,meep,null"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testMixed4() { + String [] types = { "NUMERIC", "INTEGER", "NUMERIC" }; + String [] insertVals = { "-42", "17", "33333333333333333333333.1714" }; + String validateLine = "-42,17,33333333333333333333333.1714"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testMixed5() { + String [] types = { "NUMERIC", "INTEGER", "NUMERIC" }; + String [] insertVals = { "null", "17", "33333333333333333333333.0" }; + String validateLine = "null,17,33333333333333333333333.0"; + + verifyTypes(types, insertVals, validateLine); + } + + @Test + public void testMixed6() { + String [] types = { "NUMERIC", "INTEGER", "NUMERIC" }; + String [] insertVals = { "33333333333333333333333", "17", "-42"}; + String validateLine = "33333333333333333333333,17,-42"; + + verifyTypes(types, insertVals, validateLine); + } + + ////////////////////////////////////////////////////////////////////////// + // the tests below here test the --columns parameter and ensure that + // we can selectively import only certain columns. + ////////////////////////////////////////////////////////////////////////// + + @Test + public void testSkipFirstCol() { + String [] types = { "NUMERIC", "INTEGER", "NUMERIC" }; + String [] insertVals = { "33333333333333333333333", "17", "-42"}; + String validateLine = "17,-42"; + + String [] loadCols = {"DATA_COL1", "DATA_COL2"}; + + verifyTypes(types, insertVals, validateLine, loadCols); + } + + @Test + public void testSkipSecondCol() { + String [] types = { "NUMERIC", "INTEGER", "NUMERIC" }; + String [] insertVals = { "33333333333333333333333", "17", "-42"}; + String validateLine = "33333333333333333333333,-42"; + + String [] loadCols = {"DATA_COL0", "DATA_COL2"}; + + verifyTypes(types, insertVals, validateLine, loadCols); + } + + @Test + public void testSkipThirdCol() { + String [] types = { "NUMERIC", "INTEGER", "NUMERIC" }; + String [] insertVals = { "33333333333333333333333", "17", "-42"}; + String validateLine = "33333333333333333333333,17"; + + String [] loadCols = {"DATA_COL0", "DATA_COL1"}; + + verifyTypes(types, insertVals, validateLine, loadCols); + } + + /** + * This tests that the columns argument can handle comma-separated column + * names. So this is like having: + * --columns "DATA_COL0,DATA_COL1,DATA_COL2" + * as two args on a sqoop command line + * + * @throws IOException + */ + @Test + public void testSingleColumnsArg() throws IOException { + String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" }; + String [] insertVals = { "'foo'", "'bar'", "'baz'" }; + String validateLine = "foo,bar,baz"; + String [] loadCols = {"DATA_COL0,DATA_COL1,DATA_COL2"}; + + verifyTypes(types, insertVals, validateLine, loadCols); + } + + /** + * This tests that the columns argument can handle spaces between column + * names. So this is like having: + * --columns "DATA_COL0, DATA_COL1, DATA_COL2" + * as two args on a sqoop command line + * + * @throws IOException + */ + @Test + public void testColumnsWithSpaces() throws IOException { + String [] types = { "VARCHAR(32)", "VARCHAR(32)", "VARCHAR(32)" }; + String [] insertVals = { "'foo'", "'bar'", "'baz'" }; + String validateLine = "foo,bar,baz"; + String [] loadCols = {"DATA_COL0, DATA_COL1, DATA_COL2"}; + + verifyTypes(types, insertVals, validateLine, loadCols); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/TestMultiMaps.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestMultiMaps.java b/src/test/org/apache/sqoop/TestMultiMaps.java new file mode 100644 index 0000000..050e268 --- /dev/null +++ b/src/test/org/apache/sqoop/TestMultiMaps.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapred.Utils; +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.SqoopOptions.InvalidOptionsException; +import org.apache.sqoop.orm.CompilationManager; +import org.apache.sqoop.testutil.*; +import org.apache.sqoop.tool.ImportTool; +import org.apache.sqoop.util.ClassLoaderStack; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Test that using multiple mapper splits works. + */ +public class TestMultiMaps extends ImportJobTestCase { + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + protected String [] getArgv(boolean includeHadoopFlags, String [] colNames, + String splitByCol) { + String columnsString = ""; + for (String col : colNames) { + columnsString += col + ","; + } + + ArrayList<String> args = new ArrayList<String>(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + } + + args.add("--table"); + args.add(HsqldbTestServer.getTableName()); + args.add("--columns"); + args.add(columnsString); + args.add("--split-by"); + args.add(splitByCol); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--as-sequencefile"); + args.add("--num-mappers"); + args.add("2"); + + return args.toArray(new String[0]); + } + + // this test just uses the two int table. + protected String getTableName() { + return HsqldbTestServer.getTableName(); + } + + /** @return a list of Path objects for each data file */ + protected List<Path> getDataFilePaths() throws IOException { + List<Path> paths = new ArrayList<Path>(); + Configuration conf = new Configuration(); + if (!BaseSqoopTestCase.isOnPhysicalCluster()) { + conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS); + } + FileSystem fs = FileSystem.get(conf); + + FileStatus [] stats = fs.listStatus(getTablePath(), + new Utils.OutputFileUtils.OutputFilesFilter()); + + for (FileStatus stat : stats) { + paths.add(stat.getPath()); + } + + return paths; + } + + /** + * Given a comma-delimited list of integers, grab and parse the first int. + * @param str a comma-delimited list of values, the first of which is an int. + * @return the first field in the string, cast to int + */ + private int getFirstInt(String str) { + String [] parts = str.split(","); + return Integer.parseInt(parts[0]); + } + + public void runMultiMapTest(String splitByCol, int expectedSum) + throws IOException { + + String [] columns = HsqldbTestServer.getFieldNames(); + ClassLoader prevClassLoader = null; + SequenceFile.Reader reader = null; + + String [] argv = getArgv(true, columns, splitByCol); + runImport(argv); + try { + ImportTool importTool = new ImportTool(); + SqoopOptions opts = importTool.parseArguments( + getArgv(false, columns, splitByCol), + null, null, true); + + CompilationManager compileMgr = new CompilationManager(opts); + String jarFileName = compileMgr.getJarFilename(); + + prevClassLoader = ClassLoaderStack.addJarFile(jarFileName, + getTableName()); + + List<Path> paths = getDataFilePaths(); + Configuration conf = new Configuration(); + int curSum = 0; + + // We expect multiple files. We need to open all the files and sum up the + // first column across all of them. + for (Path p : paths) { + reader = SeqFileReader.getSeqFileReader(p.toString()); + + // here we can actually instantiate (k, v) pairs. + Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf); + + // We know that these values are two ints separated by a ',' + // character. Since this is all dynamic, though, we don't want to + // actually link against the class and use its methods. So we just + // parse this back into int fields manually. Sum them up and ensure + // that we get the expected total for the first column, to verify that + // we got all the results from the db into the file. + + // now sum up everything in the file. + while (reader.next(key) != null) { + reader.getCurrentValue(val); + curSum += getFirstInt(val.toString()); + } + + IOUtils.closeStream(reader); + reader = null; + } + + assertEquals("Total sum of first db column mismatch", expectedSum, + curSum); + } catch (InvalidOptionsException ioe) { + fail(ioe.toString()); + } catch (ParseException pe) { + fail(pe.toString()); + } finally { + IOUtils.closeStream(reader); + + if (null != prevClassLoader) { + ClassLoaderStack.setCurrentClassLoader(prevClassLoader); + } + } + } + + @Test + public void testSplitByFirstCol() throws IOException { + runMultiMapTest("INTFIELD1", HsqldbTestServer.getFirstColSum()); + } +}
