Create a SecondarySort library function and update the SS example to use it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/521cea25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/521cea25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/521cea25 Branch: refs/heads/master Commit: 521cea2573b680c849f70e13e615e715dd2cdad0 Parents: 298fbaa Author: Josh Wills <[email protected]> Authored: Sat Oct 13 19:33:03 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Oct 21 05:07:54 2012 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/examples/SecondarySort.java | 32 +-- .../java/org/apache/crunch/lib/SecondarySort.java | 166 +++++++++++++++ 2 files changed, 175 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/521cea25/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java index dc2f8b0..3e08046 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java @@ -21,15 +21,12 @@ import java.io.Serializable; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; -import org.apache.crunch.GroupingOptions; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.To; -import org.apache.crunch.lib.join.JoinUtils; -import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; @@ -85,10 +82,10 @@ public class SecondarySort extends Configured implements Tool, Serializable { // a pair of pairs, the first of which will be grouped by (first member) and // the sorted by (second memeber). The second pair is payload which can be // passed in an Iterable object. - PTable<Pair<String, Long>, Pair<Long, String>> pairs = lines.parallelDo("extract_records", - new DoFn<String, Pair<Pair<String, Long>, Pair<Long, String>>>() { + PTable<String, Pair<Long, String>> pairs = lines.parallelDo("extract_records", + new DoFn<String, Pair<String, Pair<Long, String>>>() { @Override - public void process(String line, Emitter<Pair<Pair<String, Long>, Pair<Long, String>>> emitter) { + public void process(String line, Emitter<Pair<String, Pair<Long, String>>> emitter) { int i = 0; String key = ""; long timestamp = 0; @@ -116,21 +113,11 @@ public class SecondarySort extends Configured implements Tool, Serializable { } if (i == 3) { Long sortby = new Long(timestamp); - emitter.emit(new Pair<Pair<String, Long>, Pair<Long, String>>(new Pair<String, Long>(key, sortby), - new Pair<Long, String>(sortby, value))); + emitter.emit(Pair.of(key, Pair.of(sortby, value))); } else { this.getCounter(COUNTERS.CORRUPT_LINE).increment(1); } - }}, Avros.tableOf(Avros.pairs(Avros.strings(), Avros.longs()), Avros.pairs(Avros.longs(), Avros.strings()))); - - // Define partitioning and grouping properties - GroupingOptions groupingOptions = GroupingOptions.builder() - .numReducers(this.getConf().getInt("mapred.reduce.tasks", 1)) - .partitionerClass(JoinUtils.getPartitionerClass(AvroTypeFamily.getInstance())) - .groupingComparatorClass(JoinUtils.getGroupingComparator(AvroTypeFamily.getInstance())).build(); - - // Do the rest of the processing extracting a list of things according to - // groups defined in the groupingOptions + }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), Avros.strings()))); // The output of the above input will be (with one reducer): @@ -138,14 +125,13 @@ public class SecondarySort extends Configured implements Tool, Serializable { // three : [[0,-1]] // two : [[1,7,9],[2,6],[4,5]] - pairs.groupByKey(groupingOptions) - .parallelDo("group_records", - new DoFn<Pair<Pair<String, Long>, Iterable<Pair<Long, String>>>, String>() { + org.apache.crunch.lib.SecondarySort.sortAndApply(pairs, + new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() { final StringBuilder sb = new StringBuilder(); @Override - public void process(Pair<Pair<String, Long>, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) { + public void process(Pair<String, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) { sb.setLength(0); - sb.append(input.first().get(0)); + sb.append(input.first()); sb.append(" : ["); boolean first = true; for(Pair<Long, String> pair : input.second()) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/521cea25/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java new file mode 100644 index 0000000..5a826fd --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import java.util.Collection; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.lib.join.JoinUtils; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.ImmutableList; + +/** + * Utilities for performing a secondary sort on a PTable<K, Pair<V1, V2>> instance, i.e., sort on the + * key and then sort the values by V1. + */ +public class SecondarySort { + + public static <K, V1, V2> PTable<K, Collection<Pair<V1, V2>>> sort(PTable<K, Pair<V1, V2>> input) { + PTypeFamily ptf = input.getTypeFamily(); + return sortAndApply(input, new SSUnpackFn<K, V1, V2>(), + ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType()))); + } + + public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, + DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) { + PTypeFamily ptf = input.getTypeFamily(); + PType<Pair<V1, V2>> valueType = input.getValueType(); + PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf( + ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)), + valueType); + PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(), + ptf.collections(input.getValueType())); + return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter) + .groupByKey( + GroupingOptions.builder() + .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)) + .partitionerClass(JoinUtils.getPartitionerClass(ptf)) + .build()) + .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype); + } + + public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, + DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) { + PTypeFamily ptf = input.getTypeFamily(); + PType<Pair<V1, V2>> valueType = input.getValueType(); + PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf( + ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)), + valueType); + PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(), + ptf.collections(input.getValueType())); + return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter) + .groupByKey( + GroupingOptions.builder() + .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)) + .partitionerClass(JoinUtils.getPartitionerClass(ptf)) + .build()) + .parallelDo("SecondarySort.apply", new SSTableWrapFn<K, V1, V2, U, V>(doFn), ptype); + } + + private static class SSWrapFn<K, V1, V2, T> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> { + private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern; + + public SSWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern) { + this.intern = intern; + } + + @Override + public void configure(Configuration conf) { + intern.configure(conf); + } + + @Override + public void setConfigurationForTest(Configuration conf) { + intern.setConfigurationForTest(conf); + } + + @Override + public void initialize() { + intern.setContext(getContext()); + } + + @Override + public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<T> emitter) { + intern.process(Pair.of(input.first().first(), input.second()), emitter); + } + + @Override + public void cleanup(Emitter<T> emitter) { + intern.cleanup(emitter); + } + } + + private static class SSTableWrapFn<K, V1, V2, U, V> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, Pair<U, V>> { + private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern; + + public SSTableWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern) { + this.intern = intern; + } + + @Override + public void configure(Configuration conf) { + intern.configure(conf); + } + + @Override + public void setConfigurationForTest(Configuration conf) { + intern.setConfigurationForTest(conf); + } + + @Override + public void initialize() { + intern.setContext(getContext()); + } + + @Override + public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<Pair<U, V>> emitter) { + intern.process(Pair.of(input.first().first(), input.second()), emitter); + } + + @Override + public void cleanup(Emitter<Pair<U, V>> emitter) { + intern.cleanup(emitter); + } + } + + private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> { + @Override + public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) { + return Pair.of(Pair.of(input.first(), input.second().first()), input.second()); + } + } + + private static class SSUnpackFn<K, V1, V2> extends + MapFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<K, Collection<Pair<V1, V2>>>> { + @Override + public Pair<K, Collection<Pair<V1, V2>>> map(Pair<K, Iterable<Pair<V1, V2>>> input) { + Collection<Pair<V1, V2>> c = ImmutableList.copyOf(input.second()); + return Pair.of(input.first(), c); + } + } +}
