Updated Branches: refs/heads/master 453bff6fe -> d36b69ab8
CRUNCH-156: Refactor PipelineExecution interface to provide more info/control over running jobs. Contributed by Chao Shi. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d36b69ab Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d36b69ab Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d36b69ab Branch: refs/heads/master Commit: d36b69ab8d07daaca3ffdad6082b26acd6d1e49b Parents: 453bff6 Author: Josh Wills <[email protected]> Authored: Sat Mar 2 11:04:30 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Sat Mar 2 11:04:30 2013 -0800 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/CancelJobsIT.java | 53 +++-- .../java/org/apache/crunch/PipelineExecution.java | 34 +++- .../lib/jobcontrol/CrunchControlledJob.java | 6 +- .../org/apache/crunch/impl/mem/MemPipeline.java | 36 +-- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 8 +- .../org/apache/crunch/impl/mr/exec/MRExecutor.java | 198 +++++++++------ 6 files changed, 198 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java index ecdc4e0..ff01a2f 100644 --- a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java +++ b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java @@ -20,7 +20,7 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.util.concurrent.CancellationException; +import java.io.IOException; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.To; @@ -36,36 +36,49 @@ public class CancelJobsIT { @Rule public TemporaryPath tmpDir = TemporaryPaths.create(); - + @Test public void testRun() throws Exception { - run(false); + PipelineExecution pe = run(); + pe.waitUntilDone(); + PipelineResult pr = pe.getResult(); + assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus()); + assertEquals(2, pr.getStageResults().size()); } @Test - public void testCancel() throws Exception { - run(true); + public void testKill() throws Exception { + PipelineExecution pe = run(); + pe.kill(); + pe.waitUntilDone(); + assertEquals(PipelineExecution.Status.KILLED, pe.getStatus()); + } + + @Test + public void testKillMultipleTimes() throws Exception { + PipelineExecution pe = run(); + for (int i = 0; i < 10; i++) { + pe.kill(); + } + pe.waitUntilDone(); + assertEquals(PipelineExecution.Status.KILLED, pe.getStatus()); + } + + @Test + public void testKillAfterDone() throws Exception { + PipelineExecution pe = run(); + pe.waitUntilDone(); + assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus()); + pe.kill(); // expect no-op + assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus()); } - public void run(boolean cancel) throws Exception { + public PipelineExecution run() throws IOException { String shakes = tmpDir.copyResourceFileName("shakes.txt"); String out = tmpDir.getFileName("cancel"); Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration()); PCollection<String> words = p.readTextFile(shakes); p.write(words.count().top(20), To.textFile(out)); - PipelineExecution pe = p.runAsync(); - if (cancel) { - boolean cancelled = false; - pe.cancel(true); - try { - pe.get(); - } catch (CancellationException e) { - cancelled = true; - } - assertTrue(cancelled); - } else { - PipelineResult pr = pe.get(); - assertEquals(2, pr.getStageResults().size()); - } + return p.runAsync(); // need to hack to slow down job start up if this test becomes flaky. } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/PipelineExecution.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java index e74738f..fc6bb91 100644 --- a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java +++ b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java @@ -17,12 +17,38 @@ */ package org.apache.crunch; -import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.TimeUnit; /** - * A handle that extends the {@code ListenableFuture} interface to allow clients - * to control a Crunch pipeline as it runs. + * A handle to allow clients to control a Crunch pipeline as it runs. + * + * This interface is thread-safe. */ -public interface PipelineExecution extends ListenableFuture<PipelineResult> { +public interface PipelineExecution { + + enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED } + + /** Returns the .dot file that allows a client to graph the Crunch execution plan for this + * pipeline. + */ String getPlanDotFile(); + + /** Blocks until pipeline completes or the specified waiting time elapsed. */ + void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException; + + /** Blocks until pipeline completes, i.e. {@code SUCCEEDED}, {@code FAILED} or {@code KILLED}. */ + void waitUntilDone() throws InterruptedException; + + Status getStatus(); + + /** Retrieve the result of a pipeline if it has been completed, otherwise {@code null}. */ + PipelineResult getResult(); + + /** + * Kills the pipeline if it is running, no-op otherwise. + * + * This method only delivers a kill signal to the pipeline, and does not guarantee the pipeline exits on return. + * To wait for completely exits, use {@link #waitUntilDone()} after this call. + */ + void kill() throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index f30ada3..223673e 100644 --- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -33,10 +33,10 @@ import org.apache.hadoop.util.StringUtils; * This class encapsulates a MapReduce job and its dependency. It monitors the * states of the depending jobs and updates the state of this job. A job starts * in the WAITING state. If it does not have any depending jobs, or all of the - * depending jobs are in SUCCESS state, then the job state will become READY. If + * depending jobs are in SUCCEEDED state, then the job state will become READY. If * any depending jobs fail, the job will fail too. When in READY state, the job * can be submitted to Hadoop for execution, with the state changing into - * RUNNING state. From RUNNING state, the job can get into SUCCESS or FAILED + * RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED * state, depending the status of the job execution. */ public class CrunchControlledJob { @@ -244,7 +244,7 @@ public class CrunchControlledJob { /** * Check the state of this running job. The state may remain the same, become - * SUCCESS or FAILED. + * SUCCEEDED or FAILED. */ protected void checkRunningState() throws IOException, InterruptedException { try { http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 9c958a2..d7c1a4f 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -20,10 +20,7 @@ package org.apache.crunch.impl.mem; import java.io.IOException; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -227,48 +224,37 @@ public class MemPipeline implements Pipeline { public String getPlanDotFile() { return ""; } - - @Override - public void addListener(Runnable listener, Executor executor) { - lf.addListener(listener, executor); - } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return lf.cancel(mayInterruptIfRunning); + public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { + // no-po } @Override - public PipelineResult get() throws InterruptedException, ExecutionException { - return lf.get(); + public void waitUntilDone() throws InterruptedException { + // no-po } @Override - public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException, - ExecutionException, TimeoutException { - return lf.get(timeout, unit); + public Status getStatus() { + return Status.SUCCEEDED; } @Override - public boolean isCancelled() { - return lf.isCancelled(); + public PipelineResult getResult() { + return PipelineResult.EMPTY; } @Override - public boolean isDone() { - return lf.isDone(); + public void kill() { } }; } @Override public PipelineResult run() { - try { - return runAsync().get(); - } catch (Exception e) { - LOG.error("Exception running pipeline", e); - return PipelineResult.EMPTY; - } + activeTargets.clear(); + return PipelineResult.EMPTY; } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 19110b3..00cf486 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -59,7 +59,6 @@ import org.apache.hadoop.fs.Path; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListenableFuture; /** * Pipeline implementation that is executed within Hadoop MapReduce. @@ -158,8 +157,11 @@ public class MRPipeline implements Pipeline { @Override public PipelineResult run() { try { - return runAsync().get(); - } catch (Exception e) { + PipelineExecution pipelineExecution = runAsync(); + pipelineExecution.waitUntilDone(); + return pipelineExecution.getResult(); + } catch (InterruptedException e) { + // TODO: How to handle this without changing signature? LOG.error("Exception running pipeline", e); return PipelineResult.EMPTY; } http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 791fe28..901f91a 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -20,6 +20,9 @@ package org.apache.crunch.impl.mr.exec; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,21 +36,24 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.materialize.MaterializableIterable; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.ListenableFuture; /** * * */ -public class MRExecutor { +public class MRExecutor implements PipelineExecution { private static final Log LOG = LogFactory.getLog(MRExecutor.class); private final CrunchJobControl control; private final Map<PCollectionImpl<?>, Set<Target>> outputTargets; private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; - + private final CountDownLatch doneSignal = new CountDownLatch(1); + private final CountDownLatch killSignal = new CountDownLatch(1); + private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY); + private PipelineResult result; + private Thread monitorThread; + private String planDotFile; public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets, @@ -55,6 +61,12 @@ public class MRExecutor { this.control = new CrunchJobControl(jarClass.toString()); this.outputTargets = outputTargets; this.toMaterialize = toMaterialize; + this.monitorThread = new Thread(new Runnable() { + @Override + public void run() { + monitorLoop(); + } + }); } public void addJob(CrunchJob job) { @@ -66,95 +78,117 @@ public class MRExecutor { } public PipelineExecution execute() { - FutureImpl fi = new FutureImpl(); - fi.init(); - return fi; + monitorThread.start(); + return this; } - - private class FutureImpl extends AbstractFuture<PipelineResult> implements PipelineExecution { - @Override - public String getPlanDotFile() { - return planDotFile; - } - - public void init() { - Thread t = new Thread() { - @Override - public void run() { - try { - Thread controlThread = new Thread(control); - controlThread.start(); - while (!control.allFinished()) { - Thread.sleep(1000); - } - control.stop(); - } catch (InterruptedException e) { - setException(e); - return; - } - - List<CrunchControlledJob> failures = control.getFailedJobList(); - if (!failures.isEmpty()) { - System.err.println(failures.size() + " job failure(s) occurred:"); - for (CrunchControlledJob job : failures) { - System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage()); - } - } - List<PipelineResult.StageResult> stages = Lists.newArrayList(); - for (CrunchControlledJob job : control.getSuccessfulJobList()) { - try { - stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters())); - } catch (Exception e) { - LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e); - } + + /** Monitors running status. It is called in {@code MonitorThread}. */ + private void monitorLoop() { + try { + Thread controlThread = new Thread(control); + controlThread.start(); + while (killSignal.getCount() > 0 && !control.allFinished()) { + killSignal.await(1, TimeUnit.SECONDS); + } + control.stop(); + killAllRunningJobs(); + + List<CrunchControlledJob> failures = control.getFailedJobList(); + if (!failures.isEmpty()) { + System.err.println(failures.size() + " job failure(s) occurred:"); + for (CrunchControlledJob job : failures) { + System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage()); + } + } + List<PipelineResult.StageResult> stages = Lists.newArrayList(); + for (CrunchControlledJob job : control.getSuccessfulJobList()) { + try { + stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters())); + } catch (Exception e) { + LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e); + } + } + + for (PCollectionImpl<?> c : outputTargets.keySet()) { + if (toMaterialize.containsKey(c)) { + MaterializableIterable iter = toMaterialize.get(c); + if (iter.isSourceTarget()) { + iter.materialize(); + c.materializeAt((SourceTarget) iter.getSource()); } - - for (PCollectionImpl<?> c : outputTargets.keySet()) { - if (toMaterialize.containsKey(c)) { - MaterializableIterable iter = toMaterialize.get(c); - if (iter.isSourceTarget()) { - iter.materialize(); - c.materializeAt((SourceTarget) iter.getSource()); - } - } else { - boolean materialized = false; - for (Target t : outputTargets.get(c)) { - if (!materialized) { - if (t instanceof SourceTarget) { - c.materializeAt((SourceTarget) t); - materialized = true; - } else { - SourceTarget st = t.asSourceTarget(c.getPType()); - if (st != null) { - c.materializeAt(st); - materialized = true; - } - } + } else { + boolean materialized = false; + for (Target t : outputTargets.get(c)) { + if (!materialized) { + if (t instanceof SourceTarget) { + c.materializeAt((SourceTarget) t); + materialized = true; + } else { + SourceTarget st = t.asSourceTarget(c.getPType()); + if (st != null) { + c.materializeAt(st); + materialized = true; } } } } + } + } - set(new PipelineResult(stages)); + synchronized (this) { + result = new PipelineResult(stages); + if (killSignal.getCount() == 0) { + status.set(Status.KILLED); + } else { + status.set(result.succeeded() ? Status.SUCCEEDED : Status.FAILED); } - }; - t.start(); - } - - @Override - public void interruptTask() { - if (!control.allFinished()) { - control.stop(); } - for (CrunchControlledJob job : control.getRunningJobList()) { - if (!job.isCompleted()) { - try { - job.killJob(); - } catch (Exception e) { - LOG.error("Exception killing job: " + job.getJobName(), e); - } + } catch (InterruptedException e) { + throw new AssertionError(e); // Nobody should interrupt us. + } finally { + doneSignal.countDown(); + } + } + + private void killAllRunningJobs() { + for (CrunchControlledJob job : control.getRunningJobList()) { + if (!job.isCompleted()) { + try { + job.killJob(); + } catch (Exception e) { + LOG.error("Exception killing job: " + job.getJobName(), e); } } } } + + @Override + public String getPlanDotFile() { + return planDotFile; + } + + @Override + public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { + doneSignal.await(timeout, timeUnit); + } + + @Override + public void waitUntilDone() throws InterruptedException { + doneSignal.await(); + } + + @Override + public synchronized Status getStatus() { + return status.get(); + } + + @Override + public synchronized PipelineResult getResult() { + return result; + } + + @Override + public void kill() throws InterruptedException { + killSignal.countDown(); + } }
