Updated Branches: refs/heads/master d36b69ab8 -> 65a1d26f2
CRUNCH-176: CrunchTool improvements. Contributed by Dave Beech. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/65a1d26f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/65a1d26f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/65a1d26f Branch: refs/heads/master Commit: 65a1d26f2ca38b0a48ad22dfb9d37acbd369ea20 Parents: d36b69a Author: Josh Wills <[email protected]> Authored: Tue Mar 5 08:41:59 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Mar 5 08:41:59 2013 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/Pipeline.java | 2 - .../java/org/apache/crunch/util/CrunchTool.java | 29 ++++++++++++--- 2 files changed, 23 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/65a1d26f/crunch/src/main/java/org/apache/crunch/Pipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java index 9540eac..84c720c 100644 --- a/crunch/src/main/java/org/apache/crunch/Pipeline.java +++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java @@ -19,8 +19,6 @@ package org.apache.crunch; import org.apache.hadoop.conf.Configuration; -import com.google.common.util.concurrent.ListenableFuture; - /** * Manages the state of a pipeline execution. * http://git-wip-us.apache.org/repos/asf/crunch/blob/65a1d26f/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java b/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java index 54ba9fc..ea66291 100644 --- a/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java +++ b/crunch/src/main/java/org/apache/crunch/util/CrunchTool.java @@ -17,9 +17,13 @@ */ package org.apache.crunch.util; +import java.io.Serializable; + import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineExecution; +import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.crunch.Target; @@ -38,13 +42,14 @@ import org.apache.hadoop.util.Tool; * the Tool's run method. * */ -public abstract class CrunchTool extends Configured implements Tool { +public abstract class CrunchTool extends Configured implements Tool, Serializable { protected static final From from = new From(); protected static final To to = new To(); protected static final At at = new At(); - private Pipeline pipeline; + // Pipeline object itself isn't necessarily serializable. + private transient Pipeline pipeline; public CrunchTool() { this(false); @@ -90,12 +95,24 @@ public abstract class CrunchTool extends Configured implements Tool { public void writeTextFile(PCollection<?> pcollection, String pathName) { pipeline.writeTextFile(pcollection, pathName); } + + public <T> Iterable<T> materialize(PCollection<T> pcollection) { + return pipeline.materialize(pcollection); + } + + public PipelineResult run() { + return pipeline.run(); + } + + public PipelineExecution runAsync() { + return pipeline.runAsync(); + } - public void run() { - pipeline.run(); + public PipelineResult done() { + return pipeline.done(); } - public void done() { - pipeline.done(); + protected Pipeline getPipeline() { + return pipeline; } }
