Repository: crunch Updated Branches: refs/heads/master bf58906b1 -> 9df580739
CRUNCH-346: Enforce crunch.max.reducers in all cases. Contributed by Jason Gauci. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9df58073 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9df58073 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9df58073 Branch: refs/heads/master Commit: 9df5807397196178ca8328fe5ebb5d98606df062 Parents: bf58906 Author: Josh Wills <[email protected]> Authored: Mon Feb 17 17:50:35 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Feb 17 17:50:35 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/util/PartitionUtils.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9df58073/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java index 25f8866..cdcc401 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java @@ -37,7 +37,12 @@ public class PartitionUtils { public static <T> int getRecommendedPartitions(PCollection<T> pcollection) { Configuration conf = pcollection.getPipeline().getConfiguration(); - int recommended = getRecommendedPartitions(pcollection, conf); + return getRecommendedPartitions(pcollection, conf); + } + + public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) { + long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); + int recommended = 1 + (int) (pcollection.getSize() / bytesPerTask); int maxRecommended = conf.getInt(MAX_REDUCERS, DEFAULT_MAX_REDUCERS); if (maxRecommended > 0 && recommended > maxRecommended) { return maxRecommended; @@ -45,9 +50,4 @@ public class PartitionUtils { return recommended; } } - - public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) { - long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); - return 1 + (int) (pcollection.getSize() / bytesPerTask); - } }
