Repository: crunch Updated Branches: refs/heads/master 5ef1c4ed2 -> ffca004e9
CRUNCH-669: Add an option to disable temp dir deletion in the finalize() method of a DistributedPipeline Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ffca004e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ffca004e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ffca004e Branch: refs/heads/master Commit: ffca004e9efe22b3e4e35e0273f5977db4d60761 Parents: 5ef1c4e Author: Josh Wills <[email protected]> Authored: Mon Apr 30 11:47:15 2018 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Apr 30 11:47:15 2018 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/impl/dist/DistributedPipeline.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ffca004e/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index 1deafd5..0afa766 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -52,7 +52,6 @@ import org.apache.crunch.io.From; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.io.To; -import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -75,6 +74,7 @@ public abstract class DistributedPipeline implements Pipeline { private static final Random RANDOM = new Random(); private static final String CRUNCH_TMP_DIRS = "crunch.tmp.dirs"; + private static final String CRUNCH_PRESERVE_TEMP_DIR = "crunch.preserve.tmp.dir"; private final String name; protected final PCollectionFactory factory; @@ -82,6 +82,7 @@ public abstract class DistributedPipeline implements Pipeline { protected final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize; protected final Map<PipelineCallable<?>, Set<Target>> allPipelineCallables; protected final Set<Target> appendedTargets; + private final boolean preserveTempDirectory; private Path tempDirectory; private int tempFileIndex; private int nextAnonymousStageId; @@ -105,6 +106,7 @@ public abstract class DistributedPipeline implements Pipeline { this.conf = conf; this.tempFileIndex = 0; this.nextAnonymousStageId = 0; + this.preserveTempDirectory = conf.getBoolean(CRUNCH_PRESERVE_TEMP_DIR, false); } public static boolean isTempDir(Job job, String outputPath) { @@ -497,7 +499,9 @@ public abstract class DistributedPipeline implements Pipeline { protected void finalize() throws Throwable { if (tempDirectory != null) { LOG.warn("Temp directory {} still exists; was Pipeline.done() called?", tempDirectory); - deleteTempDirectory(); + if (!preserveTempDirectory) { + deleteTempDirectory(); + } } super.finalize(); }
