Updated Branches: refs/heads/master 1e06362b9 -> f69aa5d2a
CRUNCH-120: Add support for secondary sorts to the in-memory implementation Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f69aa5d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f69aa5d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f69aa5d2 Branch: refs/heads/master Commit: f69aa5d2a4d46b04d53566e56e3c64eb7affeb40 Parents: 1e06362 Author: Josh Wills <[email protected]> Authored: Mon Nov 26 11:01:10 2012 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Dec 3 09:09:25 2012 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/GroupingOptions.java | 8 + .../crunch/impl/mem/collect/MemGroupedTable.java | 24 +-- .../apache/crunch/impl/mem/collect/Shuffler.java | 148 +++++++++++++++ .../org/apache/crunch/lib/SecondarySortTest.java | 53 +++++ 4 files changed, 212 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/GroupingOptions.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java index e58b666..ea2d6c6 100644 --- a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java +++ b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java @@ -51,6 +51,14 @@ public class GroupingOptions { return sortComparatorClass; } + public Class<? extends RawComparator> getGroupingComparatorClass() { + return groupingComparatorClass; + } + + public Class<? extends Partitioner> getPartitionerClass() { + return partitionerClass; + } + public void configure(Job job) { if (partitionerClass != null) { job.setPartitionerClass(partitionerClass); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java index 1f39632..ee27ecc 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -45,33 +45,15 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen private final MemTable<K, V> parent; - private static <S, T> Map<S, Collection<T>> createMapFor(PType<S> keyType, GroupingOptions options, Pipeline pipeline) { - if (options != null && options.getSortComparatorClass() != null) { - RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), pipeline.getConfiguration()); - return new TreeMap<S, Collection<T>>(rc); - } else if (keyType != null && Comparable.class.isAssignableFrom(keyType.getTypeClass())) { - return new TreeMap<S, Collection<T>>(); - } - return Maps.newHashMap(); - } - private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) { PType<S> keyType = parent.getKeyType(); - Map<S, Collection<T>> map = createMapFor(keyType, options, parent.getPipeline()); + Shuffler<S, T> shuffler = Shuffler.create(keyType, options, parent.getPipeline()); for (Pair<S, T> pair : parent.materialize()) { - S key = pair.first(); - if (!map.containsKey(key)) { - map.put(key, Lists.<T> newArrayList()); - } - map.get(key).add(pair.second()); + shuffler.add(pair); } - List<Pair<S, Iterable<T>>> values = Lists.newArrayList(); - for (Map.Entry<S, Collection<T>> e : map.entrySet()) { - values.add(Pair.of(e.getKey(), (Iterable<T>) e.getValue())); - } - return values; + return shuffler; } public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java new file mode 100644 index 0000000..afc04c3 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.types.PType; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * In-memory versions of common MapReduce patterns for aggregating key-value data. + */ +abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> { + + public abstract void add(Pair<K, V> record); + + private static <K, V> Map<K, V> getMapForKeyType(PType<?> ptype) { + if (ptype != null && Comparable.class.isAssignableFrom(ptype.getTypeClass())) { + return new TreeMap<K, V>(); + } else { + return Maps.newHashMap(); + } + } + + public static <S, T> Shuffler<S, T> create(PType<S> keyType, GroupingOptions options, + Pipeline pipeline) { + Map<S, Collection<T>> map = getMapForKeyType(keyType); + + if (options != null) { + if (Pair.class.equals(keyType.getTypeClass()) && options.getGroupingComparatorClass() != null) { + PType<?> pairKey = keyType.getSubTypes().get(0); + return new SecondarySortShuffler(getMapForKeyType(pairKey)); + } else if (options.getSortComparatorClass() != null) { + RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), + pipeline.getConfiguration()); + map = new TreeMap<S, Collection<T>>(rc); + } + } + + return new MapShuffler<S, T>(map); + } + + private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> { + @Override + public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) { + return Pair.of(input.getKey(), (Iterable<V>) input.getValue()); + } + } + + private static class MapShuffler<K, V> extends Shuffler<K, V> { + private final Map<K, Collection<V>> map; + + public MapShuffler(Map<K, Collection<V>> map) { + this.map = map; + } + + @Override + public Iterator<Pair<K, Iterable<V>>> iterator() { + return Iterators.transform(map.entrySet().iterator(), + new HFunction<K, V>()); + } + + @Override + public void add(Pair<K, V> record) { + if (!map.containsKey(record.first())) { + Collection<V> values = Lists.newArrayList(); + map.put(record.first(), values); + } + map.get(record.first()).add(record.second()); + } + } + + private static class SSFunction<K, SK, V> implements + Function<Map.Entry<K, List<Pair<SK, V>>>, Pair<Pair<K, SK>, Iterable<V>>> { + @Override + public Pair<Pair<K, SK>, Iterable<V>> apply(Entry<K, List<Pair<SK, V>>> input) { + List<Pair<SK, V>> values = input.getValue(); + Collections.sort(values, new Comparator<Pair<SK, V>>() { + @Override + public int compare(Pair<SK, V> o1, Pair<SK, V> o2) { + return ((Comparable) o1.first()).compareTo(o2.first()); + } + }); + Pair<K, SK> key = Pair.of(input.getKey(), values.get(0).first()); + return Pair.of(key, Iterables.transform(values, new Function<Pair<SK, V>, V>() { + @Override + public V apply(Pair<SK, V> input) { + return input.second(); + } + })); + } + } + + private static class SecondarySortShuffler<K, SK, V> extends Shuffler<Pair<K, SK>, V> { + + private Map<K, List<Pair<SK, V>>> map; + + public SecondarySortShuffler(Map<K, List<Pair<SK, V>>> map) { + this.map = map; + } + + @Override + public Iterator<Pair<Pair<K, SK>, Iterable<V>>> iterator() { + return Iterators.transform(map.entrySet().iterator(), new SSFunction<K, SK, V>()); + } + + @Override + public void add(Pair<Pair<K, SK>, V> record) { + K primary = record.first().first(); + if (!map.containsKey(primary)) { + map.put(primary, Lists.<Pair<SK, V>>newArrayList()); + } + map.get(primary).add(Pair.of(record.first().second(), record.second())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java b/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java new file mode 100644 index 0000000..933b986 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.apache.crunch.types.avro.Avros.*; +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + + +public class SecondarySortTest { + @Test + public void testInMemory() throws Exception { + PTable<Long, Pair<Long, String>> input = MemPipeline.typedTableOf(tableOf(longs(), pairs(longs(), strings())), + 1729L, Pair.of(17L, "a"), 100L, Pair.of(29L, "b"), 1729L, Pair.of(29L, "c")); + PCollection<String> letters = SecondarySort.sortAndApply(input, new StringifyFn(), strings()); + assertEquals(ImmutableList.of("b", "ac"), letters.materialize()); + } + + private static class StringifyFn extends DoFn<Pair<Long, Iterable<Pair<Long, String>>>, String> { + @Override + public void process(Pair<Long, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) { + StringBuilder sb = new StringBuilder(); + for (Pair<Long, String> p : input.second()) { + sb.append(p.second()); + } + emitter.emit(sb.toString()); + } + } +}
