Updated Branches: refs/heads/trunk 5d767e796 -> 5bfd84e13
SQOOP-883: Remove input directory prior Sqoop import (Raghav Kumar Gautam via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5bfd84e1 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5bfd84e1 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5bfd84e1 Branch: refs/heads/trunk Commit: 5bfd84e137e8460408775e4c62bdf052bc262337 Parents: 5d767e7 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Jun 12 08:55:50 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Jun 12 08:55:50 2013 -0700 ---------------------------------------------------------------------- src/docs/user/import.txt | 2 + src/java/org/apache/sqoop/SqoopOptions.java | 9 +++ .../org/apache/sqoop/tool/BaseSqoopTool.java | 1 + src/java/org/apache/sqoop/tool/ImportTool.java | 36 +++++++++ .../com/cloudera/sqoop/TestSqoopOptions.java | 55 +++++++++++++ .../cloudera/sqoop/mapreduce/TestImportJob.java | 82 ++++++++++++++++++++ 6 files changed, 185 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/docs/user/import.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index 4a9a316..71b50d8 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -62,6 +62,8 @@ Argument Description +\--as-textfile+ Imports data as plain text (default) +\--boundary-query <statement>+ Boundary query to use for creating splits +\--columns <col,col,col...>+ Columns to import from table ++\--delete-target-dir+ Delete the import target directory\ + if it exists +\--direct+ Use direct import fast path +\--direct-split-size <n>+ Split the input stream every 'n' bytes\ when importing in direct mode http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 4be6a6a..01805f9 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -139,6 +139,7 @@ public class SqoopOptions implements Cloneable { @StoredAsProperty("hdfs.warehouse.dir") private String warehouseDir; @StoredAsProperty("hdfs.target.dir") private String targetDir; @StoredAsProperty("hdfs.append.dir") private boolean append; + @StoredAsProperty("hdfs.delete-target.dir") private boolean delete; @StoredAsProperty("hdfs.file.format") private FileLayout layout; @StoredAsProperty("direct.import") private boolean direct; // "direct mode." @StoredAsProperty("db.batch") private boolean batchMode; @@ -1437,6 +1438,14 @@ public class SqoopOptions implements Cloneable { return this.append; } + public void setDeleteMode(boolean doDelete) { + this.delete = doDelete; + } + + public boolean isDeleteMode() { + return this.delete; + } + /** * @return the destination file format */ http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index 01a55e5..0eca991 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -87,6 +87,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String WAREHOUSE_DIR_ARG = "warehouse-dir"; public static final String TARGET_DIR_ARG = "target-dir"; public static final String APPEND_ARG = "append"; + public static final String DELETE_ARG = "delete-target-dir"; public static final String NULL_STRING = "null-string"; public static final String INPUT_NULL_STRING = "input-null-string"; public static final String NULL_NON_STRING = "null-non-string"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/java/org/apache/sqoop/tool/ImportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 1c57503..cb800b6 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -50,6 +50,7 @@ import com.cloudera.sqoop.metastore.JobStorageFactory; import com.cloudera.sqoop.util.AppendUtils; import com.cloudera.sqoop.util.ImportException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; /** * Tool that performs database imports to HDFS. @@ -403,6 +404,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { return false; } + if (options.isDeleteMode()) { + deleteTargetDir(context); + } + if (null != tableName) { manager.importTable(context); } else { @@ -424,6 +429,22 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { return true; } + private void deleteTargetDir(ImportJobContext context) throws IOException { + + SqoopOptions options = context.getOptions(); + FileSystem fs = FileSystem.get(options.getConf()); + Path destDir = context.getDestination(); + + if (fs.exists(destDir)) { + fs.delete(destDir, true); + LOG.info("Destination directory " + destDir + " deleted."); + return; + } else { + LOG.info("Destination directory " + destDir + " is not present, " + + "hence not deleting."); + } + } + /** * @return the output path for the imported files; * in append mode this will point to a temporary folder. @@ -544,6 +565,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { .withDescription("Imports data in append mode") .withLongOpt(APPEND_ARG) .create()); + importOpts.addOption(OptionBuilder + .withDescription("Imports data in delete mode") + .withLongOpt(DELETE_ARG) + .create()); importOpts.addOption(OptionBuilder.withArgName("dir") .hasArg().withDescription("HDFS plain table destination") .withLongOpt(TARGET_DIR_ARG) @@ -758,6 +783,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { out.setAppendMode(true); } + if (in.hasOption(DELETE_ARG)) { + out.setDeleteMode(true); + } + if (in.hasOption(SQL_QUERY_ARG)) { out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG)); } @@ -905,6 +934,13 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { && options.getHCatTableName() != null) { throw new InvalidOptionsException("--hcatalog-table cannot be used " + " --warehouse-dir or --target-dir options"); + } else if (options.isDeleteMode() && options.isAppendMode()) { + throw new InvalidOptionsException("--append and --delete-target-dir can" + + " not be used together."); + } else if (options.isDeleteMode() && options.getIncrementalMode() + != SqoopOptions.IncrementalMode.None) { + throw new InvalidOptionsException("--delete-target-dir can not be used" + + " with incremental imports."); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/test/com/cloudera/sqoop/TestSqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index c78cd87..03e2504 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -22,8 +22,10 @@ import java.util.Properties; import junit.framework.TestCase; +import org.apache.commons.lang.ArrayUtils; import com.cloudera.sqoop.lib.DelimiterSet; import com.cloudera.sqoop.tool.ImportTool; +import com.cloudera.sqoop.testutil.HsqldbTestServer; /** * Test aspects of the SqoopOptions class. @@ -378,4 +380,57 @@ public class TestSqoopOptions extends TestCase { } + //helper method to validate given import options + private void validateImportOptions(String[] extraArgs) throws Exception { + String [] args = { + "--connect", HsqldbTestServer.getUrl(), + "--table", "test", + "-m", "1", + }; + ImportTool importTool = new ImportTool(); + SqoopOptions opts = importTool.parseArguments( + (String []) ArrayUtils.addAll(args, extraArgs), null, null, false); + importTool.validateOptions(opts); + } + + //test compatability of --detele-target-dir with import + public void testDeteleTargetDir() throws Exception { + String [] extraArgs = { + "--delete-target-dir", + }; + try { + validateImportOptions(extraArgs); + } catch(SqoopOptions.InvalidOptionsException ioe) { + fail("Unexpected InvalidOptionsException" + ioe); + } + } + + //test incompatability of --delete-target-dir & --append with import + public void testDeleteTargetDirWithAppend() throws Exception { + String [] extraArgs = { + "--append", + "--delete-target-dir", + }; + try { + validateImportOptions(extraArgs); + fail("Expected InvalidOptionsException"); + } catch(SqoopOptions.InvalidOptionsException ioe) { + // Expected + } + } + + //test incompatability of --delete-target-dir with incremental import + public void testDeleteWithIncrementalImport() throws Exception { + String [] extraArgs = { + "--incremental", "append", + "--delete-target-dir", + }; + try { + validateImportOptions(extraArgs); + fail("Expected InvalidOptionsException"); + } catch(SqoopOptions.InvalidOptionsException ioe) { + // Expected + } + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5bfd84e1/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java index 6ab3b82..b22b2b6 100644 --- a/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java +++ b/src/test/com/cloudera/sqoop/mapreduce/TestImportJob.java @@ -21,13 +21,20 @@ package com.cloudera.sqoop.mapreduce; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -237,4 +244,79 @@ public class TestImportJob extends ImportJobTestCase { } } + // helper method to get contents of a given dir containing sequence files + private String[] getContent(Configuration conf, Path path) throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + FileStatus[] stats = fs.listStatus(path); + String [] fileNames = new String[stats.length]; + for (int i = 0; i < stats.length; i++) { + fileNames[i] = stats[i].getPath().toString(); + } + + // Read all the files adding the value lines to the list. + List<String> strings = new ArrayList<String>(); + for (String fileName : fileNames) { + if (fileName.startsWith("_") || fileName.startsWith(".")) { + continue; + } + + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + WritableComparable key = (WritableComparable) + reader.getKeyClass().newInstance(); + Writable value = (Writable) reader.getValueClass().newInstance(); + while (reader.next(key, value)) { + strings.add(value.toString()); + } + } + return strings.toArray(new String[0]); + } + + public void testDeleteTargetDir() throws Exception { + // Make sure that if a MapReduce job to do the import fails due + // to an IOException, we tell the user about it. + + // Create a table to attempt to import. + createTableForColType("VARCHAR(32)", "'meep'"); + + Configuration conf = new Configuration(); + + // Make the output dir does not exist + Path outputPath = new Path(new Path(getWarehouseDir()), getTableName()); + FileSystem fs = FileSystem.getLocal(conf); + fs.delete(outputPath, true); + assertTrue(!fs.exists(outputPath)); + + String[] argv = getArgv(true, new String[] { "DATA_COL0" }, conf); + argv = Arrays.copyOf(argv, argv.length + 1); + argv[argv.length - 1] = "--delete-target-dir"; + + Sqoop importer = new Sqoop(new ImportTool()); + try { + int ret = Sqoop.runSqoop(importer, argv); + assertTrue("Expected job to go through if target directory" + + " does not exist.", 0 == ret); + assertTrue(fs.exists(outputPath)); + // expecting one _SUCCESS file and one file containing data + assertTrue("Expecting two files in the directory.", + fs.listStatus(outputPath).length == 2); + String[] output = getContent(conf, outputPath); + assertEquals("Expected output and actual output should be same.", "meep", + output[0]); + + ret = Sqoop.runSqoop(importer, argv); + assertTrue("Expected job to go through if target directory exists.", + 0 == ret); + assertTrue(fs.exists(outputPath)); + // expecting one _SUCCESS file and one file containing data + assertTrue("Expecting two files in the directory.", + fs.listStatus(outputPath).length == 2); + output = getContent(conf, outputPath); + assertEquals("Expected output and actual output should be same.", "meep", + output[0]); + } catch (Exception e) { + // In debug mode, ImportException is wrapped in RuntimeException. + LOG.info("Got exceptional return (expected: ok). msg is: " + e); + } + } + }
