Updated Branches: refs/heads/master c0564f181 -> 236d97dbb
CRUNCH-156: Add a runAsync option to Pipeline that returns a PipelineExecution interface that extends ListenableFuture and allows client's to interactively control running pipelines. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/236d97db Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/236d97db Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/236d97db Branch: refs/heads/master Commit: 236d97dbb249b3c4b98ec893893e2d2c591d0ae0 Parents: c0564f1 Author: Josh Wills <[email protected]> Authored: Tue Feb 26 20:14:49 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Feb 27 11:54:19 2013 -0800 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/CancelJobsIT.java | 71 ++++++++ .../src/main/java/org/apache/crunch/Pipeline.java | 10 + .../java/org/apache/crunch/PipelineExecution.java | 28 +++ .../lib/jobcontrol/CrunchControlledJob.java | 1 - .../org/apache/crunch/impl/mem/MemPipeline.java | 58 ++++++- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 49 ++---- .../org/apache/crunch/impl/mr/exec/MRExecutor.java | 134 ++++++++++++--- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 14 +- 8 files changed, 302 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java new file mode 100644 index 0000000..ecdc4e0 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CancellationException; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.junit.Rule; +import org.junit.Test; + +/** + * + */ +public class CancelJobsIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testRun() throws Exception { + run(false); + } + + @Test + public void testCancel() throws Exception { + run(true); + } + + public void run(boolean cancel) throws Exception { + String shakes = tmpDir.copyResourceFileName("shakes.txt"); + String out = tmpDir.getFileName("cancel"); + Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration()); + PCollection<String> words = p.readTextFile(shakes); + p.write(words.count().top(20), To.textFile(out)); + PipelineExecution pe = p.runAsync(); + if (cancel) { + boolean cancelled = false; + pe.cancel(true); + try { + pe.get(); + } catch (CancellationException e) { + cancelled = true; + } + assertTrue(cancelled); + } else { + PipelineResult pr = pe.get(); + assertEquals(2, pr.getStageResults().size()); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/Pipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java index af1d86a..9540eac 100644 --- a/crunch/src/main/java/org/apache/crunch/Pipeline.java +++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java @@ -19,6 +19,8 @@ package org.apache.crunch; import org.apache.hadoop.conf.Configuration; +import com.google.common.util.concurrent.ListenableFuture; + /** * Manages the state of a pipeline execution. * @@ -107,6 +109,14 @@ public interface Pipeline { PipelineResult run(); /** + * Constructs and starts a series of MapReduce jobs in order ot write data to + * the output targets, but returns a {@code ListenableFuture} to allow clients to control + * job execution. + * @return + */ + PipelineExecution runAsync(); + + /** * Run any remaining jobs required to generate outputs and then clean up any * intermediate data files that were created in this run or previous calls to * {@code run}. http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/PipelineExecution.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java new file mode 100644 index 0000000..e74738f --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * A handle that extends the {@code ListenableFuture} interface to allow clients + * to control a Crunch pipeline as it runs. + */ +public interface PipelineExecution extends ListenableFuture<PipelineResult> { + String getPlanDotFile(); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index 396ea2d..f30ada3 100644 --- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 488cdd9..9c958a2 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -20,6 +20,10 @@ package org.apache.crunch.impl.mem; import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +32,7 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; import org.apache.crunch.TableSource; @@ -49,6 +54,8 @@ import org.apache.hadoop.mapreduce.Counters; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; public class MemPipeline implements Pipeline { @@ -212,9 +219,56 @@ public class MemPipeline implements Pipeline { } @Override - public PipelineResult run() { + public PipelineExecution runAsync() { activeTargets.clear(); - return PipelineResult.EMPTY; + final ListenableFuture<PipelineResult> lf = Futures.immediateFuture(PipelineResult.EMPTY); + return new PipelineExecution() { + @Override + public String getPlanDotFile() { + return ""; + } + + @Override + public void addListener(Runnable listener, Executor executor) { + lf.addListener(listener, executor); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return lf.cancel(mayInterruptIfRunning); + } + + @Override + public PipelineResult get() throws InterruptedException, ExecutionException { + return lf.get(); + } + + @Override + public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + return lf.get(timeout, unit); + } + + @Override + public boolean isCancelled() { + return lf.isCancelled(); + } + + @Override + public boolean isDone() { + return lf.isDone(); + } + }; + } + + @Override + public PipelineResult run() { + try { + return runAsync().get(); + } catch (Exception e) { + LOG.error("Exception running pipeline", e); + return PipelineResult.EMPTY; + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 2d4d137..19110b3 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -29,6 +29,7 @@ import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; @@ -58,6 +59,7 @@ import org.apache.hadoop.fs.Path; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; /** * Pipeline implementation that is executed within Hadoop MapReduce. @@ -138,7 +140,14 @@ public class MRPipeline implements Pipeline { } public MRExecutor plan() { - MSCRPlanner planner = new MSCRPlanner(this, outputTargets); + Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize = Maps.newHashMap(); + for (PCollectionImpl<?> c : outputTargets.keySet()) { + if (outputTargetsToMaterialize.containsKey(c)) { + toMaterialize.put(c, outputTargetsToMaterialize.get(c)); + outputTargetsToMaterialize.remove(c); + } + } + MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize); try { return planner.plan(jarClass, conf); } catch (IOException e) { @@ -148,39 +157,17 @@ public class MRPipeline implements Pipeline { @Override public PipelineResult run() { - PipelineResult res = null; try { - res = plan().execute(); - } catch (CrunchRuntimeException e) { - LOG.error(e); + return runAsync().get(); + } catch (Exception e) { + LOG.error("Exception running pipeline", e); return PipelineResult.EMPTY; } - for (PCollectionImpl<?> c : outputTargets.keySet()) { - if (outputTargetsToMaterialize.containsKey(c)) { - MaterializableIterable iter = outputTargetsToMaterialize.get(c); - if (iter.isSourceTarget()) { - iter.materialize(); - c.materializeAt((SourceTarget) iter.getSource()); - } - outputTargetsToMaterialize.remove(c); - } else { - boolean materialized = false; - for (Target t : outputTargets.get(c)) { - if (!materialized) { - if (t instanceof SourceTarget) { - c.materializeAt((SourceTarget) t); - materialized = true; - } else { - SourceTarget st = t.asSourceTarget(c.getPType()); - if (st != null) { - c.materializeAt(st); - materialized = true; - } - } - } - } - } - } + } + + @Override + public PipelineExecution runAsync() { + PipelineExecution res = plan().execute(); outputTargets.clear(); return res; } http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 9811600..791fe28 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -18,14 +18,23 @@ package org.apache.crunch.impl.mr.exec; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.materialize.MaterializableIterable; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; /** * @@ -36,41 +45,116 @@ public class MRExecutor { private static final Log LOG = LogFactory.getLog(MRExecutor.class); private final CrunchJobControl control; - - public MRExecutor(Class<?> jarClass) { + private final Map<PCollectionImpl<?>, Set<Target>> outputTargets; + private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; + + private String planDotFile; + + public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets, + Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) { this.control = new CrunchJobControl(jarClass.toString()); + this.outputTargets = outputTargets; + this.toMaterialize = toMaterialize; } public void addJob(CrunchJob job) { this.control.addJob(job); } - public PipelineResult execute() { - try { - Thread controlThread = new Thread(control); - controlThread.start(); - while (!control.allFinished()) { - Thread.sleep(1000); - } - control.stop(); - } catch (InterruptedException e) { - LOG.info(e); + public void setPlanDotFile(String planDotFile) { + this.planDotFile = planDotFile; + } + + public PipelineExecution execute() { + FutureImpl fi = new FutureImpl(); + fi.init(); + return fi; + } + + private class FutureImpl extends AbstractFuture<PipelineResult> implements PipelineExecution { + @Override + public String getPlanDotFile() { + return planDotFile; } - List<CrunchControlledJob> failures = control.getFailedJobList(); - if (!failures.isEmpty()) { - System.err.println(failures.size() + " job failure(s) occurred:"); - for (CrunchControlledJob job : failures) { - System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage()); - } + + public void init() { + Thread t = new Thread() { + @Override + public void run() { + try { + Thread controlThread = new Thread(control); + controlThread.start(); + while (!control.allFinished()) { + Thread.sleep(1000); + } + control.stop(); + } catch (InterruptedException e) { + setException(e); + return; + } + + List<CrunchControlledJob> failures = control.getFailedJobList(); + if (!failures.isEmpty()) { + System.err.println(failures.size() + " job failure(s) occurred:"); + for (CrunchControlledJob job : failures) { + System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage()); + } + } + List<PipelineResult.StageResult> stages = Lists.newArrayList(); + for (CrunchControlledJob job : control.getSuccessfulJobList()) { + try { + stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters())); + } catch (Exception e) { + LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e); + } + } + + for (PCollectionImpl<?> c : outputTargets.keySet()) { + if (toMaterialize.containsKey(c)) { + MaterializableIterable iter = toMaterialize.get(c); + if (iter.isSourceTarget()) { + iter.materialize(); + c.materializeAt((SourceTarget) iter.getSource()); + } + } else { + boolean materialized = false; + for (Target t : outputTargets.get(c)) { + if (!materialized) { + if (t instanceof SourceTarget) { + c.materializeAt((SourceTarget) t); + materialized = true; + } else { + SourceTarget st = t.asSourceTarget(c.getPType()); + if (st != null) { + c.materializeAt(st); + materialized = true; + } + } + } + } + } + } + + set(new PipelineResult(stages)); + } + }; + t.start(); } - List<PipelineResult.StageResult> stages = Lists.newArrayList(); - for (CrunchControlledJob job : control.getSuccessfulJobList()) { - try { - stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters())); - } catch (Exception e) { - LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e); + + @Override + public void interruptTask() { + if (!control.allFinished()) { + control.stop(); + } + for (CrunchControlledJob job : control.getRunningJobList()) { + if (!job.isCompleted()) { + try { + job.killJob(); + } catch (Exception e) { + LOG.error("Exception killing job: " + job.getJobName(), e); + } + } } } - return new PipelineResult(stages); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 472505b..146bcbf 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -32,6 +32,7 @@ import org.apache.crunch.impl.mr.collect.InputCollection; import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; import org.apache.crunch.impl.mr.exec.MRExecutor; +import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.HashMultimap; @@ -44,11 +45,14 @@ public class MSCRPlanner { private final MRPipeline pipeline; private final Map<PCollectionImpl<?>, Set<Target>> outputs; - - public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) { + private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; + + public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs, + Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) { this.pipeline = pipeline; this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR); this.outputs.putAll(outputs); + this.toMaterialize = toMaterialize; } // Used to ensure that we always build pipelines starting from the deepest @@ -148,13 +152,15 @@ public class MSCRPlanner { // Finally, construct the jobs from the prototypes and return. DotfileWriter dotfileWriter = new DotfileWriter(); - MRExecutor exec = new MRExecutor(jarClass); + MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize); for (JobPrototype proto : Sets.newHashSet(assignments.values())) { dotfileWriter.addJobPrototype(proto); exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline)); } - conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, dotfileWriter.buildDotfile()); + String planDotFile = dotfileWriter.buildDotfile(); + exec.setPlanDotFile(planDotFile); + conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile); return exec; }
