Updated Branches: refs/heads/trunk 64878c643 -> 92e94b911
SQOOP-1078: incremental import from database in direct mode (Tim Howe via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/92e94b91 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/92e94b91 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/92e94b91 Branch: refs/heads/trunk Commit: 92e94b911d203fafbd4f3784badd29431aa5bf78 Parents: 64878c6 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Jun 19 15:38:40 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Jun 19 15:38:40 2013 -0700 ---------------------------------------------------------------------- src/java/org/apache/sqoop/util/AppendUtils.java | 117 +++++++++++++------ .../apache/sqoop/util/DirectImportUtils.java | 2 +- 2 files changed, 81 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/92e94b91/src/java/org/apache/sqoop/util/AppendUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/util/AppendUtils.java b/src/java/org/apache/sqoop/util/AppendUtils.java index 873c718..d7cd6c5 100644 --- a/src/java/org/apache/sqoop/util/AppendUtils.java +++ b/src/java/org/apache/sqoop/util/AppendUtils.java @@ -49,6 +49,8 @@ public class AppendUtils { private static final String FILEPART_SEPARATOR = "-"; private static final String FILEEXT_SEPARATOR = "."; + private static final Pattern DATA_PART_PATTERN = Pattern.compile("part.*-([0-9]{" + PARTITION_DIGITS + "}+).*"); + private ImportJobContext context = null; public AppendUtils(ImportJobContext context) { @@ -116,11 +118,10 @@ public class AppendUtils { int nextPartition = 0; FileStatus[] existingFiles = fs.listStatus(targetDir); if (existingFiles != null && existingFiles.length > 0) { - Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); for (FileStatus fileStat : existingFiles) { if (!fileStat.isDir()) { String filename = fileStat.getPath().getName(); - Matcher mat = patt.matcher(filename); + Matcher mat = DATA_PART_PATTERN.matcher(filename); if (mat.matches()) { int thisPart = Integer.parseInt(mat.group(1)); if (thisPart >= nextPartition) { @@ -140,52 +141,94 @@ public class AppendUtils { } /** - * Move files from source to target using a specified starting partition. + * Move selected files from source to target using a specified starting partition. + * + * Directories are moved without restriction. Note that the serial + * number of directories bears no relation to the file partition + * numbering. */ private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir, int partitionStart) throws IOException { - NumberFormat numpart = NumberFormat.getInstance(); - numpart.setMinimumIntegerDigits(PARTITION_DIGITS); - numpart.setGroupingUsed(false); - Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); - FileStatus[] tempFiles = fs.listStatus(sourceDir); + /* list files in the source dir and check for errors */ + + FileStatus[] sourceFiles = fs.listStatus(sourceDir); - if (null == tempFiles) { + if (null == sourceFiles) { // If we've already checked that the dir exists, and now it can't be // listed, this is a genuine error (permissions, fs integrity, or other). throw new IOException("Could not list files from " + sourceDir); } - // Move and rename files & directories from temporary to target-dir thus - // appending file's next partition - for (FileStatus fileStat : tempFiles) { - if (!fileStat.isDir()) { - // Move imported data files - String filename = fileStat.getPath().getName(); - Matcher mat = patt.matcher(filename); - if (mat.matches()) { - String name = getFilename(filename); - String fileToMove = name.concat(numpart.format(partitionStart++)); - String extension = getFileExtension(filename); - if (extension != null) { - fileToMove = fileToMove.concat(extension); - } - LOG.debug("Filename: " + filename + " repartitioned to: " - + fileToMove); - fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove)); - } - } else { - // Move directories (_logs & any other) - String dirName = fileStat.getPath().getName(); - Path path = new Path(targetDir, dirName); - int dirNumber = 0; - while (fs.exists(path)) { - path = new Path(targetDir, dirName.concat("-").concat( - numpart.format(dirNumber++))); + + /* state used throughout the entire move operation */ + + // pad the data partition number thusly + NumberFormat partFormat = NumberFormat.getInstance(); + partFormat.setMinimumIntegerDigits(PARTITION_DIGITS); + partFormat.setGroupingUsed(false); + + // where the data partitioning is currently at + int dataPart = partitionStart; + + + /* loop through all top-level files and copy matching ones */ + + for (FileStatus fileStatus : sourceFiles) { + String sourceFilename = fileStatus.getPath().getName(); + StringBuilder destFilename = new StringBuilder(); + + if (fileStatus.isDir()) { // move all subdirectories + // pass target dir as initial dest to prevent nesting inside preexisting dir + if (fs.rename(fileStatus.getPath(), targetDir)) { + LOG.debug("Directory: " + sourceFilename + " renamed to: " + sourceFilename); + } else { + int dirNumber = 0; + Path destPath; + do { + // clear the builder in case this isn't the first iteration + destFilename.setLength(0); + + // name-nnnnn? + destFilename + .append(sourceFilename) + .append("-") + .append(partFormat.format(dirNumber++)); + + destPath = new Path(targetDir, destFilename.toString()); + if (fs.exists(destPath)) + continue; + + /* + * There's still a race condition right here if an + * identically-named directory is created concurrently. + * It can be avoided by creating a parent dir for all + * migrated dirs, or by an intermediate rename. + */ + + } while (!fs.rename(fileStatus.getPath(), destPath)); + + LOG.debug("Directory: " + sourceFilename + " renamed to: " + destPath.getName()); } - LOG.debug("Directory: " + dirName + " renamed to: " + path.getName()); - fs.rename(fileStat.getPath(), path); + } else if (DATA_PART_PATTERN.matcher(sourceFilename).matches()) { // move only matching top-level files + do { + // clear the builder in case this isn't the first iteration + destFilename.setLength(0); + + // name-nnnnn + destFilename + .append(getFilename(sourceFilename)) + .append(partFormat.format(dataPart++)); + + // .ext? + String extension = getFileExtension(sourceFilename); + if (extension != null) + destFilename.append(getFileExtension(sourceFilename)); + } while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString()))); + + LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString()); + } else { // ignore everything else + LOG.debug("Filename: " + sourceFilename + " ignored"); } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/92e94b91/src/java/org/apache/sqoop/util/DirectImportUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/util/DirectImportUtils.java b/src/java/org/apache/sqoop/util/DirectImportUtils.java index e1bf22f..d801c8f 100644 --- a/src/java/org/apache/sqoop/util/DirectImportUtils.java +++ b/src/java/org/apache/sqoop/util/DirectImportUtils.java @@ -86,7 +86,7 @@ public final class DirectImportUtils { // This Writer will be closed by the caller. return new SplittableBufferedWriter( - new SplittingOutputStream(conf, destDir, "data-", + new SplittingOutputStream(conf, destDir, "part-m-", options.getDirectSplitSize(), getCodec(conf, options))); }
