Repository: sqoop Updated Branches: refs/heads/trunk d4ff097ea -> 34e4efd0d
SQOOP-1138: incremental lastmodified should re-use output directory (Abraham Elmahrek via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/34e4efd0 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/34e4efd0 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/34e4efd0 Branch: refs/heads/trunk Commit: 34e4efd0d7a6d34b3b89a7d43271ebb5aa8193a9 Parents: d4ff097 Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Jul 14 22:24:20 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Jul 14 22:24:20 2014 -0700 ---------------------------------------------------------------------- src/java/org/apache/sqoop/tool/ImportTool.java | 123 ++++++++- .../cloudera/sqoop/TestIncrementalImport.java | 253 ++++++++++++++++++- 2 files changed, 369 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/34e4efd0/src/java/org/apache/sqoop/tool/ImportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 81904ac..a3a2d0d 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -28,12 +28,17 @@ import java.sql.Types; import java.util.List; import java.util.Map; +import com.cloudera.sqoop.mapreduce.MergeJob; +import com.cloudera.sqoop.orm.TableClassName; +import com.cloudera.sqoop.util.ClassLoaderStack; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import com.cloudera.sqoop.Sqoop; @@ -66,6 +71,9 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { // store check column type for incremental option private int checkColumnType; + // Set classloader for local job runner + private ClassLoader prevClassLoader = null; + public ImportTool() { this("import", false); } @@ -91,6 +99,34 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { } /** + * If jars must be loaded into the local environment, do so here. + */ + private void loadJars(Configuration conf, String ormJarFile, + String tableClassName) throws IOException { + + boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address")) + || "local".equals(conf.get("mapred.job.tracker")); + if (isLocal) { + // If we're using the LocalJobRunner, then instead of using the compiled + // jar file as the job source, we're running in the current thread. Push + // on another classloader that loads from that jar in addition to + // everything currently on the classpath. + this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, + tableClassName); + } + } + + /** + * If any classloader was invoked by loadJars, free it here. + */ + private void unloadJars() { + if (null != this.prevClassLoader) { + // unload the special classloader for this jar. + ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader); + } + } + + /** * @return true if the supplied options specify an incremental import. */ private boolean isIncremental(SqoopOptions options) { @@ -256,6 +292,7 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { return true; } + FileSystem fs = FileSystem.get(options.getConf()); SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode(); String nextIncrementalValue = null; @@ -280,6 +317,12 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { } break; case DateLastModified: + if (options.getMergeKeyCol() == null && !options.isAppendMode() + && fs.exists(getOutputPath(options, context.getTableName(), false))) { + throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG + + " is required when using --" + this.INCREMENT_TYPE_ARG + + " lastmodified and the output directory exists."); + } checkColumnType = manager.getColumnTypes(options.getTableName(), options.getSqlQuery()).get(options.getIncrementalTestColumn()); nextVal = manager.getCurrentDbTimestamp(); @@ -382,6 +425,48 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { } /** + * Merge HDFS output directories + */ + protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws IOException { + FileSystem fs = FileSystem.get(options.getConf()); + if (context.getDestination() != null && fs.exists(context.getDestination())) { + Path userDestDir = getOutputPath(options, context.getTableName(), false); + if (fs.exists(userDestDir)) { + String tableClassName = null; + if (!context.getConnManager().isORMFacilitySelfManaged()) { + tableClassName = + new TableClassName(options).getClassForTable(context.getTableName()); + } + Path destDir = getOutputPath(options, context.getTableName()); + options.setExistingJarName(context.getJarFile()); + options.setClassName(tableClassName); + options.setMergeOldPath(userDestDir.toString()); + options.setMergeNewPath(context.getDestination().toString()); + // Merge to temporary directory so that original directory remains intact. + options.setTargetDir(destDir.toString()); + + // Local job tracker needs jars in the classpath. + loadJars(options.getConf(), context.getJarFile(), context.getTableName()); + + MergeJob mergeJob = new MergeJob(options); + if (mergeJob.runMergeJob()) { + // Rename destination directory to proper location. + Path tmpDir = getOutputPath(options, context.getTableName()); + fs.rename(userDestDir, tmpDir); + fs.rename(destDir, userDestDir); + fs.delete(tmpDir, true); + } else { + LOG.error("Merge MapReduce job failed!"); + } + + unloadJars(); + } else { + fs.rename(context.getDestination(), userDestDir); + } + } + } + + /** * Import a table or query. * @return true if an import was performed, false otherwise. */ @@ -392,9 +477,11 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { // Generate the ORM code for the tables. jarFile = codeGenerator.generateORM(options, tableName); + Path outputPath = getOutputPath(options, tableName); + // Do the actual import. ImportJobContext context = new ImportJobContext(tableName, jarFile, - options, getOutputPath(options, tableName)); + options, outputPath); // If we're doing an incremental import, set up the // filtering conditions used to get the latest records. @@ -415,6 +502,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { if (options.isAppendMode()) { AppendUtils app = new AppendUtils(context); app.append(); + } else if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified) { + lastModifiedMerge(options, context); } // If the user wants this table to be in Hive, perform that post-load. @@ -449,11 +538,20 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { * if importing to hbase, this may return null. */ private Path getOutputPath(SqoopOptions options, String tableName) { + return getOutputPath(options, tableName, options.isAppendMode() + || options.getIncrementalMode().equals(SqoopOptions.IncrementalMode.DateLastModified)); + } + + /** + * @return the output path for the imported files; + * if importing to hbase, this may return null. + */ + private Path getOutputPath(SqoopOptions options, String tableName, boolean temp) { // Get output directory String hdfsWarehouseDir = options.getWarehouseDir(); String hdfsTargetDir = options.getTargetDir(); Path outputPath = null; - if (options.isAppendMode()) { + if (temp) { // Use temporary path, later removed when appending String salt = tableName; if(salt == null && options.getSqlQuery() != null) { @@ -586,6 +684,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { + " value of the primary key") .withLongOpt(SQL_QUERY_BOUNDARY) .create()); + importOpts.addOption(OptionBuilder.withArgName("column") + .hasArg().withDescription("Key column to use to join results") + .withLongOpt(MERGE_KEY_ARG) + .create()); addValidationOpts(importOpts); } @@ -798,6 +900,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY)); } + if (in.hasOption(MERGE_KEY_ARG)) { + out.setMergeKeyCol(in.getOptionValue(MERGE_KEY_ARG)); + } + applyValidationOptions(in, out); } @@ -941,14 +1047,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { && options.getHCatTableName() != null) { throw new InvalidOptionsException("--hcatalog-table cannot be used " + " --warehouse-dir or --target-dir options"); - } else if (options.isDeleteMode() && options.isAppendMode()) { + } else if (options.isDeleteMode() && options.isAppendMode()) { throw new InvalidOptionsException("--append and --delete-target-dir can" + " not be used together."); - } else if (options.isDeleteMode() && options.getIncrementalMode() + } else if (options.isDeleteMode() && options.getIncrementalMode() != SqoopOptions.IncrementalMode.None) { throw new InvalidOptionsException("--delete-target-dir can not be used" + " with incremental imports."); - } + } } /** @@ -969,6 +1075,13 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { "You must specify an incremental import mode with --" + INCREMENT_TYPE_ARG + ". " + HELP_STR); } + + if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified + && options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) { + throw new InvalidOptionsException("--" + + INCREMENT_TYPE_ARG + " lastmodified cannot be used in conjunction with --" + + FMT_AVRODATAFILE_ARG + "." + HELP_STR); + } } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/34e4efd0/src/test/com/cloudera/sqoop/TestIncrementalImport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java index 8eadcdd..fd94552 100644 --- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java +++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java @@ -323,10 +323,62 @@ public class TestIncrementalImport extends TestCase { } /** + * Look at a directory that should contain files full of an imported 'id' + * column and 'last_modified' column. Assert that all numbers in [0, expectedNums) are present + * in order. + */ + public void assertDirOfNumbersAndTimestamps(String tableName, int expectedNums) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] fileNames = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + fileNames[i] = stats[i].getPath().toString(); + } + + Arrays.sort(fileNames); + + // Read all the files in sorted order, adding the value lines to the list. + List<String> receivedNums = new ArrayList<String>(); + for (String fileName : fileNames) { + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(fileName)))); + try { + while (true) { + String s = r.readLine(); + if (null == s) { + break; + } + + receivedNums.add(s.trim()); + } + } finally { + r.close(); + } + } + + assertEquals(expectedNums, receivedNums.size()); + + // Compare the received values with the expected set. + for (int i = 0; i < expectedNums; i++) { + assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i).split(",")[0])); + } + } catch (Exception e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + + /** * Assert that a directory contains a file with exactly one line * in it, containing the prescribed number 'val'. */ - public void assertSpecificNumber(String tableName, int val) { + public void assertFirstSpecificNumber(String tableName, int val) { try { FileSystem fs = FileSystem.getLocal(new Configuration()); Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); @@ -375,6 +427,53 @@ public class TestIncrementalImport extends TestCase { } } + /** + * Assert that a directory contains a file with exactly one line + * in it, containing the prescribed number 'val'. + */ + public void assertSpecificNumber(String tableName, int val) { + try { + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR); + Path tableDir = new Path(warehouse, tableName); + FileStatus [] stats = fs.listStatus(tableDir); + String [] filePaths = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + filePaths[i] = stats[i].getPath().toString(); + } + + // Read the first file that is not a hidden file. + boolean foundVal = false; + for (String filePath : filePaths) { + String fileName = new Path(filePath).getName(); + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + if (foundVal) { + // Make sure we don't have two or more "real" files in the dir. + fail("Got an extra data-containing file in this directory."); + } + + BufferedReader r = new BufferedReader( + new InputStreamReader(fs.open(new Path(filePath)))); + try { + String s = r.readLine(); + if (val == (int) Integer.valueOf(s.trim().split(",")[0])) { + if (foundVal) { + fail("Expected only one result, but got another line: " + s); + } + foundVal = true; + } + } finally { + r.close(); + } + } + } catch (IOException e) { + fail("Got unexpected exception: " + StringUtils.stringifyException(e)); + } + } + public void runImport(SqoopOptions options, List<String> args) { try { Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options); @@ -465,7 +564,7 @@ public class TestIncrementalImport extends TestCase { args.add("--incremental"); args.add("lastmodified"); args.add("--check-column"); - args.add("last_modified"); + args.add("LAST_MODIFIED"); } args.add("-m"); args.add("1"); @@ -858,6 +957,156 @@ public class TestIncrementalImport extends TestCase { // Import only the new row. clearDir(TABLE_NAME); runJob(TABLE_NAME); + assertFirstSpecificNumber(TABLE_NAME, 4000); + } + + public void testUpdateModifyWithTimestamp() throws Exception { + // Create a table with data in it; import it. + // Then modify some existing rows, and verify that we only grab + // those rows. + + final String TABLE_NAME = "updateModifyTimestamp"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertDirOfNumbers(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options2 = new SqoopOptions(); + options2.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options2); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + TABLE_NAME + + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Update the new row. + args.add("--last-value"); + args.add(new Timestamp(importWasBefore).toString()); + args.add("--merge-key"); + args.add("id"); + conf = newConf(); + options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertSpecificNumber(TABLE_NAME, 4000); + } + + public void testUpdateModifyWithTimestampWithQuery() throws Exception { + // Create an empty table. Import it; nothing happens. + // Add some rows. Verify they are appended. + + final String TABLE_NAME = "UpdateModifyWithTimestampWithQuery"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + final String QUERY = "SELECT id, last_modified FROM UpdateModifyWithTimestampWithQuery WHERE $CONDITIONS"; + + List<String> args = getArgListForQuery(QUERY, TABLE_NAME, + true, false, false); + + Configuration conf = newConf(); + SqoopOptions options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertDirOfNumbersAndTimestamps(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options2 = new SqoopOptions(); + options2.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options2); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + TABLE_NAME + + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Update the new row. + args.add("--last-value"); + args.add(new Timestamp(importWasBefore).toString()); + args.add("--merge-key"); + args.add("id"); + conf = newConf(); + options = new SqoopOptions(); + options.setConf(conf); + runImport(options, args); + assertSpecificNumber(TABLE_NAME, 4000); + } + + public void testUpdateModifyWithTimestampJob() throws Exception { + // Create a table with data in it; import it. + // Then modify some existing rows, and verify that we only grab + // those rows. + + final String TABLE_NAME = "updateModifyTimestampJob"; + Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100); + createTimestampTable(TABLE_NAME, 10, thePast); + + List<String> args = getArgListForTable(TABLE_NAME, false, false); + args.add("--merge-key"); + args.add("id"); + createJob(TABLE_NAME, args); + runJob(TABLE_NAME); + assertDirOfNumbers(TABLE_NAME, 10); + + // Modify a row. + long importWasBefore = System.currentTimeMillis(); + Thread.sleep(50); + long rowsAddedTime = System.currentTimeMillis() - 5; + assertTrue(rowsAddedTime > importWasBefore); + assertTrue(rowsAddedTime < System.currentTimeMillis()); + SqoopOptions options2 = new SqoopOptions(); + options2.setConnectString(SOURCE_DB_URL); + HsqldbManager manager = new HsqldbManager(options2); + Connection c = manager.getConnection(); + PreparedStatement s = null; + try { + s = c.prepareStatement("UPDATE " + TABLE_NAME + + " SET id=?, last_modified=? WHERE id=?"); + s.setInt(1, 4000); // the first row should have '4000' in it now. + s.setTimestamp(2, new Timestamp(rowsAddedTime)); + s.setInt(3, 0); + s.executeUpdate(); + c.commit(); + } finally { + s.close(); + } + + // Update the new row. + runJob(TABLE_NAME); assertSpecificNumber(TABLE_NAME, 4000); }
