Repository: crunch Updated Branches: refs/heads/master 20fc3ab79 -> fa04e3c7b
Added the ability to specify the amount of reducers when doing a sharded join. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fa04e3c7 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fa04e3c7 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fa04e3c7 Branch: refs/heads/master Commit: fa04e3c7b31f2416efb2022757c1866f63e8b5f3 Parents: 20fc3ab Author: Joel <[email protected]> Authored: Thu Dec 24 17:55:03 2015 +0100 Committer: Josh Wills <[email protected]> Committed: Thu Mar 10 10:33:24 2016 -0800 ---------------------------------------------------------------------- .../crunch/lib/join/ShardedJoinStrategy.java | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/fa04e3c7/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java index b881e66..2a38457 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java @@ -63,6 +63,16 @@ public class ShardedJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { public ShardedJoinStrategy(int numShards) { this(new ConstantShardingStrategy<K>(numShards)); } + + /** + * Instantiate with a constant number of shards to use for all keys. + * + * @param numShards number of shards to use + * @param numReducers the amount of reducers to run the join with + */ + public ShardedJoinStrategy(int numShards, int numReducers) { + this(new ConstantShardingStrategy<K>(numShards), numReducers); + } /** * Instantiate with a custom sharding strategy. @@ -74,6 +84,20 @@ public class ShardedJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { this.shardingStrategy = shardingStrategy; } + /** + * Instantiate with a custom sharding strategy and a specified number of reducers. + * + * @param shardingStrategy strategy to be used for sharding + * @param numReducers the amount of reducers to run the join with + */ + public ShardedJoinStrategy(ShardingStrategy<K> shardingStrategy, int numReducers) { + if (numReducers < 1) { + throw new IllegalArgumentException("Num reducers must be > 0, got " + numReducers); + } + this.wrappedJoinStrategy = new DefaultJoinStrategy<Pair<K, Integer>, U, V>(numReducers); + this.shardingStrategy = shardingStrategy; + } + @Override public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
