CRUNCH-78: Add a secondary sort example. Signed-off-by: Josh Wills <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/167c9c28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/167c9c28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/167c9c28 Branch: refs/heads/master Commit: 167c9c2818400103f1f0860bbf0d7f2e31a46c2b Parents: e7b5feb Author: Alex Kozlov <[email protected]> Authored: Wed Sep 26 12:04:47 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Oct 2 20:18:36 2012 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/examples/SecondarySort.java | 177 +++++++++++++++ 1 files changed, 177 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/167c9c28/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java new file mode 100644 index 0000000..dc2f8b0 --- /dev/null +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import java.io.Serializable; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.lib.join.JoinUtils; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Splitter; + +@SuppressWarnings("serial") +public class SecondarySort extends Configured implements Tool, Serializable { + + static enum COUNTERS { + CORRUPT_TIMESTAMP, + CORRUPT_LINE + } + + // Input records are comma separated. The first field is grouping record. The + // second is the one to sort on (a long in this implementation). The rest is + // payload to be sorted. + + // For example: + + // one,1,1 + // one,2,-3 + // two,4,5 + // two,2,6 + // two,1,7,9 + // three,0,-1 + // one,-5,10 + // one,-10,garbage + + private static final char SPLIT_ON = ','; + private static final Splitter INPUT_SPLITTER = Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3); + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println(); + System.err.println("Usage: " + this.getClass().getName() + + " [generic options] input output"); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + // Create an object to coordinate pipeline creation and execution. + Pipeline pipeline = new MRPipeline(SecondarySort.class, getConf()); + // Reference a given text file as a collection of Strings. + PCollection<String> lines = pipeline.readTextFile(args[0]); + + // Define a function that parses each line in a PCollection of Strings into + // a pair of pairs, the first of which will be grouped by (first member) and + // the sorted by (second memeber). The second pair is payload which can be + // passed in an Iterable object. + PTable<Pair<String, Long>, Pair<Long, String>> pairs = lines.parallelDo("extract_records", + new DoFn<String, Pair<Pair<String, Long>, Pair<Long, String>>>() { + @Override + public void process(String line, Emitter<Pair<Pair<String, Long>, Pair<Long, String>>> emitter) { + int i = 0; + String key = ""; + long timestamp = 0; + String value = ""; + for (String element : INPUT_SPLITTER.split(line)) { + switch (++i) { + case 1: + key = element; + break; + case 2: + try { + timestamp = Long.parseLong(element); + } catch (NumberFormatException e) { + System.out.println("Timestamp not in long format '" + line + "'"); + this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1); + } + break; + case 3: + value = element; + break; + default: + System.err.println("i = " + i + " should never happen!"); + break; + } + } + if (i == 3) { + Long sortby = new Long(timestamp); + emitter.emit(new Pair<Pair<String, Long>, Pair<Long, String>>(new Pair<String, Long>(key, sortby), + new Pair<Long, String>(sortby, value))); + } else { + this.getCounter(COUNTERS.CORRUPT_LINE).increment(1); + } + }}, Avros.tableOf(Avros.pairs(Avros.strings(), Avros.longs()), Avros.pairs(Avros.longs(), Avros.strings()))); + + // Define partitioning and grouping properties + GroupingOptions groupingOptions = GroupingOptions.builder() + .numReducers(this.getConf().getInt("mapred.reduce.tasks", 1)) + .partitionerClass(JoinUtils.getPartitionerClass(AvroTypeFamily.getInstance())) + .groupingComparatorClass(JoinUtils.getGroupingComparator(AvroTypeFamily.getInstance())).build(); + + // Do the rest of the processing extracting a list of things according to + // groups defined in the groupingOptions + + // The output of the above input will be (with one reducer): + + // one : [[-10,garbage],[-5,10],[1,1],[2,-3]] + // three : [[0,-1]] + // two : [[1,7,9],[2,6],[4,5]] + + pairs.groupByKey(groupingOptions) + .parallelDo("group_records", + new DoFn<Pair<Pair<String, Long>, Iterable<Pair<Long, String>>>, String>() { + final StringBuilder sb = new StringBuilder(); + @Override + public void process(Pair<Pair<String, Long>, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) { + sb.setLength(0); + sb.append(input.first().get(0)); + sb.append(" : ["); + boolean first = true; + for(Pair<Long, String> pair : input.second()) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append(pair); + } + sb.append("]"); + emitter.emit(sb.toString()); + } + }, Writables.strings()).write(To.textFile(args[1])); + + // Execute the pipeline as a MapReduce. + return pipeline.done().succeeded() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int exitCode = -1; + try { + exitCode = ToolRunner.run(new Configuration(), new SecondarySort(), args); + } catch (Throwable e) { + e.printStackTrace(); + } + System.exit(exitCode); + } +}
