Updated Branches: refs/heads/master e0dbebf15 -> 9f53a5122
CRUNCH-282: Add a parameter to control the maximum number of reducers for a job Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9f53a512 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9f53a512 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9f53a512 Branch: refs/heads/master Commit: 9f53a5122200244ee0340a9e85ce1478d6628dbb Parents: e0dbebf Author: Josh Wills <[email protected]> Authored: Wed Oct 16 17:46:34 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Oct 17 06:32:15 2013 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/util/PartitionUtils.java | 21 ++++- .../apache/crunch/util/PartitionUtilsTest.java | 89 ++++++++++++++++++++ 2 files changed, 107 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9f53a512/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java index 0a5c404..25f8866 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java @@ -21,14 +21,29 @@ import org.apache.crunch.PCollection; import org.apache.hadoop.conf.Configuration; /** - * + * Helper functions and settings for determining the number of reducers to use in a pipeline + * job created by the Crunch planner. */ public class PartitionUtils { public static final String BYTES_PER_REDUCE_TASK = "crunch.bytes.per.reduce.task"; public static final long DEFAULT_BYTES_PER_REDUCE_TASK = 1000L * 1000L * 1000L; - + + /** + * Set an upper limit on the number of reducers the Crunch planner will set for an MR + * job when it tries to determine how many reducers to use based on the input size. + */ + public static final String MAX_REDUCERS = "crunch.max.reducers"; + public static final int DEFAULT_MAX_REDUCERS = 500; + public static <T> int getRecommendedPartitions(PCollection<T> pcollection) { - return getRecommendedPartitions(pcollection, pcollection.getPipeline().getConfiguration()); + Configuration conf = pcollection.getPipeline().getConfiguration(); + int recommended = getRecommendedPartitions(pcollection, conf); + int maxRecommended = conf.getInt(MAX_REDUCERS, DEFAULT_MAX_REDUCERS); + if (maxRecommended > 0 && recommended > maxRecommended) { + return maxRecommended; + } else { + return recommended; + } } public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) { http://git-wip-us.apache.org/repos/asf/crunch/blob/9f53a512/crunch-core/src/test/java/org/apache/crunch/util/PartitionUtilsTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/util/PartitionUtilsTest.java b/crunch-core/src/test/java/org/apache/crunch/util/PartitionUtilsTest.java new file mode 100644 index 0000000..ee82cea --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/util/PartitionUtilsTest.java @@ -0,0 +1,89 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.crunch.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PartitionUtilsTest { + + @Mock + private PCollection<String> pcollection; + + @Mock + private Pipeline pipeline; + + @Test + public void testBasic() throws Exception { + Configuration conf = new Configuration(); + when(pcollection.getSize()).thenReturn(7 * 1000L * 1000L * 1000L); + when(pcollection.getPipeline()).thenReturn(pipeline); + when(pipeline.getConfiguration()).thenReturn(conf); + assertEquals(8, PartitionUtils.getRecommendedPartitions(pcollection)); + } + + @Test + public void testBytesPerTask() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(PartitionUtils.BYTES_PER_REDUCE_TASK, 500L * 1000L * 1000L); + when(pcollection.getSize()).thenReturn(7 * 1000L * 1000L * 1000L); + when(pcollection.getPipeline()).thenReturn(pipeline); + when(pipeline.getConfiguration()).thenReturn(conf); + assertEquals(15, PartitionUtils.getRecommendedPartitions(pcollection)); + } + + @Test + public void testDefaultMaxRecommended() throws Exception { + Configuration conf = new Configuration(); + when(pcollection.getSize()).thenReturn(1000 * 1000L * 1000L * 1000L); + when(pcollection.getPipeline()).thenReturn(pipeline); + when(pipeline.getConfiguration()).thenReturn(conf); + assertEquals(500, PartitionUtils.getRecommendedPartitions(pcollection)); + } + + @Test + public void testMaxRecommended() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(PartitionUtils.MAX_REDUCERS, 400); + when(pcollection.getSize()).thenReturn(1000 * 1000L * 1000L * 1000L); + when(pcollection.getPipeline()).thenReturn(pipeline); + when(pipeline.getConfiguration()).thenReturn(conf); + assertEquals(400, PartitionUtils.getRecommendedPartitions(pcollection)); + } + + @Test + public void testNegativeMaxRecommended() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(PartitionUtils.MAX_REDUCERS, -1); + when(pcollection.getSize()).thenReturn(1000 * 1000L * 1000L * 1000L); + when(pcollection.getPipeline()).thenReturn(pipeline); + when(pipeline.getConfiguration()).thenReturn(conf); + assertEquals(1001, PartitionUtils.getRecommendedPartitions(pcollection)); + } +}
