Repository: crunch Updated Branches: refs/heads/master 1b8b15315 -> 0da8e3c3e
CRUNCH-353: When job failure is noticed, MRPipeline kills all its running jobs and exits immediate. This saves slot-time. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/0da8e3c3 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/0da8e3c3 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/0da8e3c3 Branch: refs/heads/master Commit: 0da8e3c3e5b24ad70c1ab1a64ec6dd21d7fdea11 Parents: 1b8b153 Author: Chao Shi <[email protected]> Authored: Sat Feb 22 21:42:05 2014 +0800 Committer: Chao Shi <[email protected]> Committed: Sun Feb 23 16:55:07 2014 +0800 ---------------------------------------------------------------------- .../crunch/impl/mr/exec/MRExecutorIT.java | 110 +++++++++++++++++++ .../lib/jobcontrol/CrunchJobControl.java | 4 + .../apache/crunch/impl/mr/exec/MRExecutor.java | 2 +- 3 files changed, 115 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java new file mode 100644 index 0000000..729a12d --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/exec/MRExecutorIT.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.exec; + +import org.apache.commons.lang.time.StopWatch; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PipelineExecution; +import org.apache.crunch.impl.mr.MRJob; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.MRPipelineExecution; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; + +import static org.apache.crunch.types.writable.Writables.longs; +import static org.apache.crunch.types.writable.Writables.strings; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MRExecutorIT { + + private static class SleepForeverFn extends DoFn<Long, Long> { + @Override + public void process(Long input, Emitter<Long> emitter) { + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + throw new CrunchRuntimeException(e); + } + } + } + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + /** + * Tests that the pipeline should be stopped immediately when one of the jobs + * get failed. The rest of running jobs should be killed. + */ + @Test + public void testStopPipelineImmediatelyOnJobFailure() throws Exception { + String inPath = tmpDir.copyResourceFileName("shakes.txt"); + MRPipeline pipeline = new MRPipeline(MRExecutorIT.class); + + // Issue two jobs that sleep forever. + PCollection<String> in = pipeline.read(From.textFile(inPath)); + for (int i = 0; i < 2; i++) { + in.count() + .values() + .parallelDo(new SleepForeverFn(), longs()) + .write(To.textFile(tmpDir.getPath("out_" + i))); + } + MRPipelineExecution exec = pipeline.runAsync(); + + // Wait until both of the two jobs are submitted. + List<MRJob> jobs = exec.getJobs(); + assertEquals(2, jobs.size()); + StopWatch watch = new StopWatch(); + watch.start(); + int numOfJobsSubmitted = 0; + while (numOfJobsSubmitted < 2 && watch.getTime() < 10000) { + numOfJobsSubmitted = 0; + for (MRJob job : jobs) { + if (job.getJobState() == MRJob.State.RUNNING) { + numOfJobsSubmitted++; + } + } + Thread.sleep(100); + } + assertEquals(2, numOfJobsSubmitted); + + // Kill one of them. + Job job0 = jobs.get(0).getJob(); + job0.killJob(); + + // Expect the pipeline exits and the other job is killed. + StopWatch watch2 = new StopWatch(); + watch2.start(); + Job job1 = jobs.get(1).getJob(); + while (!job1.isComplete() && watch2.getTime() < 10000) { + Thread.sleep(100); + } + assertTrue(job1.isComplete()); + assertEquals(PipelineExecution.Status.FAILED, exec.getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index ce7a6d9..aac2ffa 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -221,6 +221,10 @@ public class CrunchJobControl { && this.runningJobs.size() == 0; } + synchronized public boolean anyFailures() { + return this.failedJobs.size() > 0; + } + /** * Checks the states of the running jobs Update the states of waiting jobs, and submits the jobs in * ready state (i.e. whose dependencies are all finished in success). http://git-wip-us.apache.org/repos/asf/crunch/blob/0da8e3c3/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 1137498..3eba7a1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -108,7 +108,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe private void monitorLoop() { status.set(Status.RUNNING); try { - while (killSignal.getCount() > 0 && !control.allFinished()) { + while (killSignal.getCount() > 0 && !control.allFinished() && !control.anyFailures()) { control.pollJobStatusAndStartNewOnes(); killSignal.await(pollInterval.get(), TimeUnit.MILLISECONDS); }
