Repository: crunch Updated Branches: refs/heads/master 5d9432fbc -> 463a131b4
CRUNCH-515: Decrease collision probability on temp dir cleanup. Contributed by Sean Owen. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/463a131b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/463a131b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/463a131b Branch: refs/heads/master Commit: 463a131b445e34456c020c95ae66edf7b9f345e7 Parents: 5d9432f Author: Josh Wills <[email protected]> Authored: Mon Oct 19 12:15:10 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Oct 19 12:15:10 2015 -0700 ---------------------------------------------------------------------- .../crunch/impl/dist/DistributedPipeline.java | 58 +++++++++++++------- 1 file changed, 39 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/463a131b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index 049046a..d3fb0d0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -99,7 +99,6 @@ public abstract class DistributedPipeline implements Pipeline { this.allPipelineCallables = Maps.newHashMap(); this.appendedTargets = Sets.newHashSet(); this.conf = conf; - this.tempDirectory = createTempDirectory(conf); this.tempFileIndex = 0; this.nextAnonymousStageId = 0; } @@ -115,8 +114,9 @@ public abstract class DistributedPipeline implements Pipeline { @Override public void setConfiguration(Configuration conf) { + // Clear any existing temp dir + deleteTempDirectory(); this.conf = conf; - this.tempDirectory = createTempDirectory(conf); } @Override @@ -390,19 +390,22 @@ public abstract class DistributedPipeline implements Pipeline { public Path createTempPath() { tempFileIndex++; - return new Path(tempDirectory, "p" + tempFileIndex); + return new Path(getTempDirectory(), "p" + tempFileIndex); } - private static Path createTempDirectory(Configuration conf) { - Path dir = createTemporaryPath(conf); - try { - FileSystem fs = dir.getFileSystem(conf); - fs.mkdirs(dir); - fs.deleteOnExit(dir); - } catch (IOException e) { - throw new RuntimeException("Cannot create job output directory " + dir, e); + private synchronized Path getTempDirectory() { + if (tempDirectory == null) { + Path dir = createTemporaryPath(conf); + try { + FileSystem fs = dir.getFileSystem(conf); + fs.mkdirs(dir); + fs.deleteOnExit(dir); + } catch (IOException e) { + throw new RuntimeException("Cannot create job output directory " + dir, e); + } + tempDirectory = dir; } - return dir; + return tempDirectory; } private static Path createTemporaryPath(Configuration conf) { @@ -427,21 +430,38 @@ public abstract class DistributedPipeline implements Pipeline { @Override public void cleanup(boolean force) { if (force || outputTargets.isEmpty()) { + deleteTempDirectory(); + } else { + LOG.warn("Not running cleanup while output targets remain."); + } + } + + private void cleanup() { + cleanup(false); + } + + private synchronized void deleteTempDirectory() { + Path toDelete = tempDirectory; + tempDirectory = null; + if (toDelete != null) { try { - FileSystem fs = tempDirectory.getFileSystem(conf); - if (fs.exists(tempDirectory)) { - fs.delete(tempDirectory, true); + FileSystem fs = toDelete.getFileSystem(conf); + if (fs.exists(toDelete)) { + fs.delete(toDelete, true); } } catch (IOException e) { LOG.info("Exception during cleanup", e); } - } else { - LOG.warn("Not running cleanup while output targets remain."); } } - private void cleanup() { - cleanup(false); + @Override + protected void finalize() throws Throwable { + if (tempDirectory != null) { + LOG.warn("Temp directory {} still exists; was Pipeline.done() called?", tempDirectory); + deleteTempDirectory(); + } + super.finalize(); } public int getNextAnonymousStageId() {
