Updated Branches: refs/heads/master 23bad11d6 -> 10bf70489
CRUNCH-307: Limit the number of concurrently running jobs Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/96e39453 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/96e39453 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/96e39453 Branch: refs/heads/master Commit: 96e3945383d15a5d70b7a6bed2f02fcd0d79e58e Parents: 2a8b6c1 Author: Chao Shi <[email protected]> Authored: Wed Dec 4 21:26:19 2013 +0800 Committer: Chao Shi <[email protected]> Committed: Wed Dec 4 21:26:19 2013 +0800 ---------------------------------------------------------------------- .../lib/jobcontrol/CrunchJobControl.java | 14 +++- .../apache/crunch/impl/mr/exec/MRExecutor.java | 7 +- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 2 +- .../crunch/impl/mr/run/RuntimeParameters.java | 2 + .../lib/jobcontrol/CrunchJobControlTest.java | 77 ++++++++++++++++++++ 5 files changed, 96 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index 47cfb94..ce7a6d9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.impl.mr.MRJob.State; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.hadoop.conf.Configuration; /** * This class encapsulates a set of MapReduce jobs and its dependency. @@ -49,6 +51,7 @@ public class CrunchJobControl { private Log log = LogFactory.getLog(CrunchJobControl.class); private final String groupName; + private final int maxRunningJobs; /** * Construct a job control for a group of jobs. @@ -56,13 +59,14 @@ public class CrunchJobControl { * @param groupName * a name identifying this group */ - public CrunchJobControl(String groupName) { + public CrunchJobControl(Configuration conf, String groupName) { this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>(); this.readyJobs = new Hashtable<Integer, CrunchControlledJob>(); this.runningJobs = new Hashtable<Integer, CrunchControlledJob>(); this.successfulJobs = new Hashtable<Integer, CrunchControlledJob>(); this.failedJobs = new Hashtable<Integer, CrunchControlledJob>(); this.groupName = groupName; + this.maxRunningJobs = conf.getInt(RuntimeParameters.MAX_RUNNING_JOBS, 5); } private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) { @@ -190,8 +194,12 @@ public class CrunchJobControl { this.readyJobs = new Hashtable<Integer, CrunchControlledJob>(); for (CrunchControlledJob nextJob : oldJobs.values()) { - // Submitting Job to Hadoop - nextJob.submit(); + // Limit the number of concurrent running jobs. If we have reached such limit, + // stop submitting new jobs and wait until some running job completes. + if (runningJobs.size() < maxRunningJobs) { + // Submitting Job to Hadoop + nextJob.submit(); + } this.addToQueue(nextJob); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index a655b23..38344a2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -69,9 +69,12 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe private String planDotFile; - public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets, + public MRExecutor( + Configuration conf, + Class<?> jarClass, + Map<PCollectionImpl<?>, Set<Target>> outputTargets, Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) { - this.control = new CrunchJobControl(jarClass.toString()); + this.control = new CrunchJobControl(conf, jarClass.toString()); this.outputTargets = outputTargets; this.toMaterialize = toMaterialize; this.monitorThread = new Thread(new Runnable() { http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index ac61fec..96c9125 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -159,7 +159,7 @@ public class MSCRPlanner { // Finally, construct the jobs from the prototypes and return. DotfileWriter dotfileWriter = new DotfileWriter(); - MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize); + MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize); for (JobPrototype proto : Sets.newHashSet(assignments.values())) { dotfileWriter.addJobPrototype(proto); exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID)); http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 987ccd3..0c9f229 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -36,6 +36,8 @@ public final class RuntimeParameters { public static final String DISABLE_DEEP_COPY = "crunch.disable.deep.copy"; + public static final String MAX_RUNNING_JOBS = "crunch.max.running.jobs"; + // Not instantiated private RuntimeParameters() { } http://git-wip-us.apache.org/repos/asf/crunch/blob/96e39453/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java new file mode 100644 index 0000000..562e99d --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; + +import org.apache.crunch.impl.mr.MRJob; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CrunchJobControlTest { + @Test + public void testMaxRunningJobs() throws IOException, InterruptedException { + Configuration conf = new Configuration(); + conf.setInt(RuntimeParameters.MAX_RUNNING_JOBS, 2); + CrunchJobControl jobControl = new CrunchJobControl(conf, "group"); + CrunchControlledJob job1 = createJob(1); + CrunchControlledJob job2 = createJob(2); + CrunchControlledJob job3 = createJob(3); + + // Submit job1 and job2. + jobControl.addJob(job1); + jobControl.addJob(job2); + jobControl.pollJobStatusAndStartNewOnes(); + verify(job1).submit(); + verify(job2).submit(); + + // Add job3 and expect it is pending. + jobControl.addJob(job3); + jobControl.pollJobStatusAndStartNewOnes(); + verify(job3, never()).submit(); + + // Expect job3 is submitted after job1 is done. + setSuccess(job1); + jobControl.pollJobStatusAndStartNewOnes(); + verify(job3).submit(); + } + + private CrunchControlledJob createJob(int jobID) throws IOException, InterruptedException { + Job mrJob = mock(Job.class); + when(mrJob.getConfiguration()).thenReturn(new Configuration()); + CrunchControlledJob job = new CrunchControlledJob( + jobID, + mrJob, + mock(CrunchControlledJob.Hook.class), + mock(CrunchControlledJob.Hook.class)); + return spy(job); + } + + private void setSuccess(CrunchControlledJob job) throws IOException, InterruptedException { + when(job.checkState()).thenReturn(MRJob.State.SUCCESS); + when(job.getJobState()).thenReturn(MRJob.State.SUCCESS); + } +}
