Repository: crunch Updated Branches: refs/heads/master cd4b3887b -> d7443e392
CRUNCH-501: Detach PType values before calling Aggregator methods Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d7443e39 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d7443e39 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d7443e39 Branch: refs/heads/master Commit: d7443e3927aaec99563d8a48fe1341c5d5b8f85a Parents: cd4b388 Author: Josh Wills <[email protected]> Authored: Mon Feb 23 19:35:53 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Feb 23 19:35:53 2015 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/io/avro/AvroModeIT.java | 6 ++--- .../java/org/apache/crunch/fn/Aggregators.java | 28 +++++++++++++++++--- .../impl/dist/collect/BaseGroupedTable.java | 5 ++-- .../impl/mem/collect/MemGroupedTable.java | 5 ++-- 4 files changed, 33 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java index ff66fd7..8fa55eb 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java @@ -57,12 +57,12 @@ public class AvroModeIT implements Serializable { " ]\n" + "}"); - static final class FloatArray { + public static final class FloatArray { private final float[] values; - FloatArray() { + public FloatArray() { this(null); } - FloatArray(float[] values) { + public FloatArray(float[] values) { this.values = values; } float[] getValues() { http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java index 084cca4..e7aeb18 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java @@ -32,6 +32,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.types.PType; import org.apache.crunch.util.Tuples; import org.apache.hadoop.conf.Configuration; @@ -439,9 +440,23 @@ public final class Aggregators { * * @param aggregator The instance to wrap * @return A {@link CombineFn} delegating to {@code aggregator} + * + * @deprecated use the safer {@link #toCombineFn(Aggregator, PType)} instead. */ + @Deprecated public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V> aggregator) { - return new AggregatorCombineFn<K, V>(aggregator); + return toCombineFn(aggregator, null); + } + + /** + * Wrap a {@link CombineFn} adapter around the given aggregator. + * + * @param aggregator The instance to wrap + * @param ptype The PType of the aggregated value (for detaching complex objects) + * @return A {@link CombineFn} delegating to {@code aggregator} + */ + public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V> aggregator, PType<V> ptype) { + return new AggregatorCombineFn<K, V>(aggregator, ptype); } /** @@ -460,22 +475,27 @@ public final class Aggregators { */ private static class AggregatorCombineFn<K, V> extends CombineFn<K, V> { // TODO: Has to be fully qualified until CombineFn.Aggregator can be removed. - private final org.apache.crunch.Aggregator<V> aggregator; + private final Aggregator<V> aggregator; + private final PType<V> ptype; - public AggregatorCombineFn(org.apache.crunch.Aggregator<V> aggregator) { + public AggregatorCombineFn(Aggregator<V> aggregator, PType<V> ptype) { this.aggregator = aggregator; + this.ptype = ptype; } @Override public void initialize() { aggregator.initialize(getConfiguration()); + if (ptype != null) { + ptype.initialize(getConfiguration()); + } } @Override public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) { aggregator.reset(); for (V v : input.second()) { - aggregator.update(v); + aggregator.update(ptype == null ? v : ptype.getDetachedValue(v)); } for (V v : aggregator.results()) { emitter.emit(Pair.of(input.first(), v)); http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java index 064bba8..d87c8f5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java @@ -90,12 +90,13 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>> @Override public PTable<K, V> combineValues(Aggregator<V> agg) { - return combineValues(Aggregators.<K, V>toCombineFn(agg)); + return combineValues(Aggregators.<K, V>toCombineFn(agg, parent.getValueType())); } @Override public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) { - return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg)); + return combineValues(Aggregators.<K, V>toCombineFn(combineAgg, parent.getValueType()), + Aggregators.<K, V>toCombineFn(reduceAgg, parent.getValueType())); } private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> { http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java index 6efc062..0e4516a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -119,12 +119,13 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen @Override public PTable<K, V> combineValues(Aggregator<V> agg) { - return combineValues(Aggregators.<K, V>toCombineFn(agg)); + return combineValues(Aggregators.<K, V>toCombineFn(agg, parent.getValueType())); } @Override public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) { - return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg)); + return combineValues(Aggregators.<K, V>toCombineFn(combineAgg, parent.getValueType()), + Aggregators.<K, V>toCombineFn(reduceAgg, parent.getValueType())); } @Override
