Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 d34302b9b -> 1aefb1a97
CRUNCH-346: Enforce crunch.max.reducers in all cases. Contributed by Jason Gauci. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1aefb1a9 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1aefb1a9 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1aefb1a9 Branch: refs/heads/apache-crunch-0.8 Commit: 1aefb1a970075a595e112d9ad730521896fdd5d1 Parents: d34302b Author: Josh Wills <[email protected]> Authored: Mon Feb 17 17:50:35 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Feb 17 17:51:00 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/util/PartitionUtils.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1aefb1a9/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java index 25f8866..cdcc401 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java @@ -37,7 +37,12 @@ public class PartitionUtils { public static <T> int getRecommendedPartitions(PCollection<T> pcollection) { Configuration conf = pcollection.getPipeline().getConfiguration(); - int recommended = getRecommendedPartitions(pcollection, conf); + return getRecommendedPartitions(pcollection, conf); + } + + public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) { + long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); + int recommended = 1 + (int) (pcollection.getSize() / bytesPerTask); int maxRecommended = conf.getInt(MAX_REDUCERS, DEFAULT_MAX_REDUCERS); if (maxRecommended > 0 && recommended > maxRecommended) { return maxRecommended; @@ -45,9 +50,4 @@ public class PartitionUtils { return recommended; } } - - public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) { - long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); - return 1 + (int) (pcollection.getSize() / bytesPerTask); - } }
