Repository: crunch Updated Branches: refs/heads/master f0f75f3c7 -> f8cc90a75
CRUNCH-455: Support custom rawcomparators for sorting in the in-memory mode Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f8cc90a7 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f8cc90a7 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f8cc90a7 Branch: refs/heads/master Commit: f8cc90a751b7779ee67fab66160af311b9282c4a Parents: f0f75f3 Author: Josh Wills <[email protected]> Authored: Mon Aug 11 11:06:10 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Aug 11 11:06:10 2014 -0700 ---------------------------------------------------------------------- .../impl/mem/collect/MemGroupedTable.java | 13 +++- .../crunch/impl/mem/collect/Shuffler.java | 64 +++++++++++++++----- .../crunch/types/avro/AvroKeyConverter.java | 14 +---- .../java/org/apache/crunch/lib/SortTest.java | 41 +++++++++++++ 4 files changed, 104 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f8cc90a7/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java index 172fe36..6efc062 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -24,6 +24,8 @@ import java.util.TreeMap; import org.apache.crunch.Aggregator; import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; @@ -137,6 +139,15 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen @Override public PTable<K, V> ungroup() { - return parent; + return parallelDo("ungroup", new UngroupFn<K, V>(), parent.getPTableType()); + } + + private static class UngroupFn<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> { + @Override + public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) { + for (V v : input.second()) { + emitter.emit(Pair.of(input.first(), v)); + } + } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/f8cc90a7/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java index 2e8f9eb..489fab3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.impl.mem.collect; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -26,12 +27,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import com.google.common.collect.ImmutableList; import org.apache.crunch.GroupingOptions; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.SingleUseIterable; import org.apache.crunch.types.PType; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Function; @@ -57,49 +60,80 @@ abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> { public static <S, T> Shuffler<S, T> create(PType<S> keyType, GroupingOptions options, Pipeline pipeline) { - Map<S, Collection<T>> map = getMapForKeyType(keyType); + Map<Object, Collection<T>> map = getMapForKeyType(keyType); if (options != null) { + Job job; + try { + job = new Job(pipeline.getConfiguration()); + } catch (IOException e) { + throw new IllegalStateException("Could not create Job instance", e); + } + options.configure(job); if (Pair.class.equals(keyType.getTypeClass()) && options.getGroupingComparatorClass() != null) { PType<?> pairKey = keyType.getSubTypes().get(0); return new SecondarySortShuffler(getMapForKeyType(pairKey)); } else if (options.getSortComparatorClass() != null) { - RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), - pipeline.getConfiguration()); - map = new TreeMap<S, Collection<T>>(rc); + RawComparator rc = ReflectionUtils.newInstance( + options.getSortComparatorClass(), + job.getConfiguration()); + map = new TreeMap<Object, Collection<T>>(rc); + return new MapShuffler<S, T>(map, keyType); } } - return new MapShuffler<S, T>(map); } - private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> { + private static class HFunction<K, V> implements Function<Map.Entry<Object, Collection<V>>, Pair<K, Iterable<V>>> { + private final PType<K> keyType; + + public HFunction(PType<K> keyType) { + this.keyType = keyType; + } + @Override - public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) { - return Pair.<K, Iterable<V>>of(input.getKey(), new SingleUseIterable<V>(input.getValue())); + public Pair<K, Iterable<V>> apply(Map.Entry<Object, Collection<V>> input) { + K key; + if (keyType == null) { + key = (K) input.getKey(); + } else { + Object k = keyType.getConverter().convertInput(input.getKey(), null); + key = keyType.getInputMapFn().map(k); + } + return Pair.<K, Iterable<V>>of(key, new SingleUseIterable<V>(input.getValue())); } } private static class MapShuffler<K, V> extends Shuffler<K, V> { - private final Map<K, Collection<V>> map; - - public MapShuffler(Map<K, Collection<V>> map) { + private final Map<Object, Collection<V>> map; + private final PType<K> keyType; + + public MapShuffler(Map<Object, Collection<V>> map) { + this(map, null); + } + + public MapShuffler(Map<Object, Collection<V>> map, PType<K> keyType) { this.map = map; + this.keyType = keyType; } @Override public Iterator<Pair<K, Iterable<V>>> iterator() { return Iterators.transform(map.entrySet().iterator(), - new HFunction<K, V>()); + new HFunction<K, V>(keyType)); } @Override public void add(Pair<K, V> record) { - if (!map.containsKey(record.first())) { + Object key = record.first(); + if (keyType != null) { + key = keyType.getConverter().outputKey(keyType.getOutputMapFn().map((K) key)); + } + if (!map.containsKey(key)) { Collection<V> values = Lists.newArrayList(); - map.put(record.first(), values); + map.put(key, values); } - map.get(record.first()).add(record.second()); + map.get(key).add(record.second()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/f8cc90a7/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java index 38437ab..b3b2eda 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java @@ -24,8 +24,6 @@ import org.apache.hadoop.io.NullWritable; class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, Iterable<K>> { - private transient AvroWrapper<K> wrapper = null; - @Override public K convertInput(AvroWrapper<K> key, NullWritable value) { return key.datum(); @@ -33,8 +31,7 @@ class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, @Override public AvroWrapper<K> outputKey(K value) { - getWrapper().datum(value); - return wrapper; + return new AvroKey<K>(value); } @Override @@ -44,7 +41,7 @@ class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, @Override public Class<AvroWrapper<K>> getKeyClass() { - return (Class<AvroWrapper<K>>) getWrapper().getClass(); + return (Class<AvroWrapper<K>>) (Class) AvroKey.class; } @Override @@ -57,13 +54,6 @@ class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, return true; } - private AvroWrapper<K> getWrapper() { - if (wrapper == null) { - wrapper = new AvroKey<K>(); - } - return wrapper; - } - @Override public Iterable<K> convertIterableInput(AvroWrapper<K> key, Iterable<NullWritable> value) { throw new UnsupportedOperationException("Should not be possible"); http://git-wip-us.apache.org/repos/asf/crunch/blob/f8cc90a7/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java new file mode 100644 index 0000000..d1f1fa0 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import com.google.common.collect.ImmutableList; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.junit.Test; + +import static org.apache.crunch.types.avro.Avros.longs; +import static org.apache.crunch.types.avro.Avros.pairs; +import static org.apache.crunch.types.avro.Avros.strings; +import static org.junit.Assert.assertEquals; + +public class SortTest { + + @Test + public void testInMemoryReverseAvro() throws Exception { + PCollection<Pair<String, Long>> pc = MemPipeline.typedCollectionOf(pairs(strings(), longs()), + Pair.of("a", 1L), Pair.of("c", 7L), Pair.of("b", 10L)); + PCollection<Pair<String, Long>> sorted = Sort.sortPairs(pc, Sort.ColumnOrder.by(2, Sort.Order.DESCENDING)); + assertEquals(ImmutableList.of(Pair.of("b", 10L), Pair.of("c", 7L), Pair.of("a", 1L)), + ImmutableList.copyOf(sorted.materialize())); + } +}
