Updated Branches: refs/heads/master ceaa6a5e0 -> 1d844b3f1
CRUNCH-162: Add a Shard library for rebalancing the contents of PCollections Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1d844b3f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1d844b3f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1d844b3f Branch: refs/heads/master Commit: 1d844b3f1f107cc645319080ff3e9e73f0dd72e2 Parents: ceaa6a5 Author: Josh Wills <[email protected]> Authored: Thu Jun 6 05:49:30 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jun 6 05:49:30 2013 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Aggregate.java | 22 ++++-- .../src/main/java/org/apache/crunch/lib/Shard.java | 65 +++++++++++++++ .../org/apache/crunch/util/PartitionUtils.java | 4 + 3 files changed, 84 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index d4109cc..d8388b3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -33,11 +33,11 @@ import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.fn.Aggregators; -import org.apache.crunch.fn.MapValuesFn; import org.apache.crunch.materialize.pobject.FirstElementPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.util.PartitionUtils; import com.google.common.collect.Lists; @@ -52,15 +52,24 @@ public class Aggregate { * of their occurrences. */ public static <S> PTable<S, Long> count(PCollection<S> collect) { + return count(collect, PartitionUtils.getRecommendedPartitions(collect)); + } + + /** + * Returns a {@code PTable} that contains the unique elements of this collection mapped to a count + * of their occurrences. + */ + public static <S> PTable<S, Long> count(PCollection<S> collect, int numPartitions) { PTypeFamily tf = collect.getTypeFamily(); return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>() { public Pair<S, Long> map(S input) { return Pair.of(input, 1L); } - }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey() + }, tf.tableOf(collect.getPType(), tf.longs())) + .groupByKey(numPartitions) .combineValues(Aggregators.SUM_LONGS()); } - + /** * Returns the number of elements in the provided PCollection. * @@ -252,9 +261,8 @@ public class Aggregate { public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) { PTypeFamily tf = collect.getTypeFamily(); final PType<V> valueType = collect.getValueType(); - return collect.groupByKey().parallelDo("collect", - new MapValuesFn<K, Iterable<V>, Collection<V>>() { - + return collect.groupByKey().mapValues("collect", + new MapFn<Iterable<V>, Collection<V>>() { @Override public void initialize() { valueType.initialize(getConfiguration()); @@ -267,6 +275,6 @@ public class Aggregate { } return collected; } - }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType()))); + }, tf.collections(collect.getValueType())); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java new file mode 100644 index 0000000..07ba0db --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Shard.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PType; + +/** + * Utilities for controlling how the data in a {@code PCollection} is balanced across reducers + * and output files. + */ +public class Shard { + + /** + * Creates a {@code PCollection<T>} that has the same contents as its input argument but will + * be written to a fixed number of output files. This is useful for map-only jobs that process + * lots of input files but only write out a small amount of input per task. + * + * @param pc The {@code PCollection<T>} to rebalance + * @param numPartitions The number of output partitions to create + * @return A rebalanced {@code PCollection<T>} with the same contents as the input + */ + public static <T> PCollection<T> shard(PCollection<T> pc, int numPartitions) { + PType<T> pt = pc.getPType(); + return Aggregate.count(pc, numPartitions).parallelDo("shards", new ShardFn<T>(pt), pt); + } + + private static class ShardFn<T> extends DoFn<Pair<T, Long>, T> { + private final PType<T> ptype; + + public ShardFn(PType<T> ptype) { + this.ptype = ptype; + } + + @Override + public void initialize() { + ptype.initialize(getConfiguration()); + } + + @Override + public void process(Pair<T, Long> input, Emitter<T> emitter) { + for (int i = 0; i < input.second(); i++) { + emitter.emit(ptype.getDetachedValue(input.first())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/1d844b3f/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java index da8db6b..0a5c404 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java @@ -27,6 +27,10 @@ public class PartitionUtils { public static final String BYTES_PER_REDUCE_TASK = "crunch.bytes.per.reduce.task"; public static final long DEFAULT_BYTES_PER_REDUCE_TASK = 1000L * 1000L * 1000L; + public static <T> int getRecommendedPartitions(PCollection<T> pcollection) { + return getRecommendedPartitions(pcollection, pcollection.getPipeline().getConfiguration()); + } + public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) { long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); return 1 + (int) (pcollection.getSize() / bytesPerTask);
