Updated Branches: refs/heads/master 298fbaa0a -> 8c725ac79
CRUNCH-96: Some renaming and additional javadoc for the secondary sort stuff based on greid's feedback Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8c725ac7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8c725ac7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8c725ac7 Branch: refs/heads/master Commit: 8c725ac7915294bd4e2133655e57ce66a5c06c0b Parents: db0ce8e Author: Josh Wills <[email protected]> Authored: Fri Oct 19 17:49:51 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Oct 21 05:07:55 2012 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/examples/SecondarySort.java | 163 -------------- .../crunch/examples/SecondarySortExample.java | 164 +++++++++++++++ .../java/org/apache/crunch/lib/SecondarySort.java | 10 +- 3 files changed, 169 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8c725ac7/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java deleted file mode 100644 index 3e08046..0000000 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.examples; - -import java.io.Serializable; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.PCollection; -import org.apache.crunch.PTable; -import org.apache.crunch.Pair; -import org.apache.crunch.Pipeline; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.To; -import org.apache.crunch.types.avro.Avros; -import org.apache.crunch.types.writable.Writables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import com.google.common.base.Splitter; - -@SuppressWarnings("serial") -public class SecondarySort extends Configured implements Tool, Serializable { - - static enum COUNTERS { - CORRUPT_TIMESTAMP, - CORRUPT_LINE - } - - // Input records are comma separated. The first field is grouping record. The - // second is the one to sort on (a long in this implementation). The rest is - // payload to be sorted. - - // For example: - - // one,1,1 - // one,2,-3 - // two,4,5 - // two,2,6 - // two,1,7,9 - // three,0,-1 - // one,-5,10 - // one,-10,garbage - - private static final char SPLIT_ON = ','; - private static final Splitter INPUT_SPLITTER = Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3); - - @Override - public int run(String[] args) throws Exception { - if (args.length != 2) { - System.err.println(); - System.err.println("Usage: " + this.getClass().getName() - + " [generic options] input output"); - System.err.println(); - GenericOptionsParser.printGenericCommandUsage(System.err); - return 1; - } - // Create an object to coordinate pipeline creation and execution. - Pipeline pipeline = new MRPipeline(SecondarySort.class, getConf()); - // Reference a given text file as a collection of Strings. - PCollection<String> lines = pipeline.readTextFile(args[0]); - - // Define a function that parses each line in a PCollection of Strings into - // a pair of pairs, the first of which will be grouped by (first member) and - // the sorted by (second memeber). The second pair is payload which can be - // passed in an Iterable object. - PTable<String, Pair<Long, String>> pairs = lines.parallelDo("extract_records", - new DoFn<String, Pair<String, Pair<Long, String>>>() { - @Override - public void process(String line, Emitter<Pair<String, Pair<Long, String>>> emitter) { - int i = 0; - String key = ""; - long timestamp = 0; - String value = ""; - for (String element : INPUT_SPLITTER.split(line)) { - switch (++i) { - case 1: - key = element; - break; - case 2: - try { - timestamp = Long.parseLong(element); - } catch (NumberFormatException e) { - System.out.println("Timestamp not in long format '" + line + "'"); - this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1); - } - break; - case 3: - value = element; - break; - default: - System.err.println("i = " + i + " should never happen!"); - break; - } - } - if (i == 3) { - Long sortby = new Long(timestamp); - emitter.emit(Pair.of(key, Pair.of(sortby, value))); - } else { - this.getCounter(COUNTERS.CORRUPT_LINE).increment(1); - } - }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), Avros.strings()))); - - // The output of the above input will be (with one reducer): - - // one : [[-10,garbage],[-5,10],[1,1],[2,-3]] - // three : [[0,-1]] - // two : [[1,7,9],[2,6],[4,5]] - - org.apache.crunch.lib.SecondarySort.sortAndApply(pairs, - new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() { - final StringBuilder sb = new StringBuilder(); - @Override - public void process(Pair<String, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) { - sb.setLength(0); - sb.append(input.first()); - sb.append(" : ["); - boolean first = true; - for(Pair<Long, String> pair : input.second()) { - if (first) { - first = false; - } else { - sb.append(','); - } - sb.append(pair); - } - sb.append("]"); - emitter.emit(sb.toString()); - } - }, Writables.strings()).write(To.textFile(args[1])); - - // Execute the pipeline as a MapReduce. - return pipeline.done().succeeded() ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - int exitCode = -1; - try { - exitCode = ToolRunner.run(new Configuration(), new SecondarySort(), args); - } catch (Throwable e) { - e.printStackTrace(); - } - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8c725ac7/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java new file mode 100644 index 0000000..998bd7f --- /dev/null +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import java.io.Serializable; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.SecondarySort; +import org.apache.crunch.io.To; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Splitter; + +@SuppressWarnings("serial") +public class SecondarySortExample extends Configured implements Tool, Serializable { + + static enum COUNTERS { + CORRUPT_TIMESTAMP, + CORRUPT_LINE + } + + // Input records are comma separated. The first field is grouping record. The + // second is the one to sort on (a long in this implementation). The rest is + // payload to be sorted. + + // For example: + + // one,1,1 + // one,2,-3 + // two,4,5 + // two,2,6 + // two,1,7,9 + // three,0,-1 + // one,-5,10 + // one,-10,garbage + + private static final char SPLIT_ON = ','; + private static final Splitter INPUT_SPLITTER = Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3); + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println(); + System.err.println("Usage: " + this.getClass().getName() + + " [generic options] input output"); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + // Create an object to coordinate pipeline creation and execution. + Pipeline pipeline = new MRPipeline(SecondarySortExample.class, getConf()); + // Reference a given text file as a collection of Strings. + PCollection<String> lines = pipeline.readTextFile(args[0]); + + // Define a function that parses each line in a PCollection of Strings into + // a pair of pairs, the first of which will be grouped by (first member) and + // the sorted by (second memeber). The second pair is payload which can be + // passed in an Iterable object. + PTable<String, Pair<Long, String>> pairs = lines.parallelDo("extract_records", + new DoFn<String, Pair<String, Pair<Long, String>>>() { + @Override + public void process(String line, Emitter<Pair<String, Pair<Long, String>>> emitter) { + int i = 0; + String key = ""; + long timestamp = 0; + String value = ""; + for (String element : INPUT_SPLITTER.split(line)) { + switch (++i) { + case 1: + key = element; + break; + case 2: + try { + timestamp = Long.parseLong(element); + } catch (NumberFormatException e) { + System.out.println("Timestamp not in long format '" + line + "'"); + this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1); + } + break; + case 3: + value = element; + break; + default: + System.err.println("i = " + i + " should never happen!"); + break; + } + } + if (i == 3) { + Long sortby = new Long(timestamp); + emitter.emit(Pair.of(key, Pair.of(sortby, value))); + } else { + this.getCounter(COUNTERS.CORRUPT_LINE).increment(1); + } + }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), Avros.strings()))); + + // The output of the above input will be (with one reducer): + + // one : [[-10,garbage],[-5,10],[1,1],[2,-3]] + // three : [[0,-1]] + // two : [[1,7,9],[2,6],[4,5]] + + SecondarySort.sortAndApply(pairs, + new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() { + final StringBuilder sb = new StringBuilder(); + @Override + public void process(Pair<String, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) { + sb.setLength(0); + sb.append(input.first()); + sb.append(" : ["); + boolean first = true; + for(Pair<Long, String> pair : input.second()) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append(pair); + } + sb.append("]"); + emitter.emit(sb.toString()); + } + }, Writables.strings()).write(To.textFile(args[1])); + + // Execute the pipeline as a MapReduce. + return pipeline.done().succeeded() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int exitCode = -1; + try { + exitCode = ToolRunner.run(new Configuration(), new SecondarySortExample(), args); + } catch (Throwable e) { + e.printStackTrace(); + } + System.exit(exitCode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8c725ac7/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java index ebf7fb4..30639b1 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java +++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java @@ -35,6 +35,11 @@ import org.apache.hadoop.conf.Configuration; /** * Utilities for performing a secondary sort on a {@code PTable<K, Pair<V1, V2>>} collection. + * <p> + * Secondary sorts are usually performed during sessionization: given a collection + * of events, we want to group them by a key (such as a user ID), then sort the grouped + * records by an auxillary key (such as a timestamp), and then perform some additional + * processing on the sorted records. */ public class SecondarySort { @@ -95,11 +100,6 @@ public class SecondarySort { } @Override - public void setConfigurationForTest(Configuration conf) { - intern.setConfigurationForTest(conf); - } - - @Override public void initialize() { intern.setContext(getContext()); }
