Updated Branches:
  refs/heads/master 298fbaa0a -> 8c725ac79

CRUNCH-96: Some renaming and additional javadoc for the secondary sort stuff 
based on greid's feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8c725ac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8c725ac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8c725ac7

Branch: refs/heads/master
Commit: 8c725ac7915294bd4e2133655e57ce66a5c06c0b
Parents: db0ce8e
Author: Josh Wills <[email protected]>
Authored: Fri Oct 19 17:49:51 2012 -0700
Committer: Josh Wills <[email protected]>
Committed: Sun Oct 21 05:07:55 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/examples/SecondarySort.java  |  163 --------------
 .../crunch/examples/SecondarySortExample.java      |  164 +++++++++++++++
 .../java/org/apache/crunch/lib/SecondarySort.java  |   10 +-
 3 files changed, 169 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8c725ac7/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
----------------------------------------------------------------------
diff --git 
a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java 
b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
deleted file mode 100644
index 3e08046..0000000
--- 
a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySort.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.examples;
-
-import java.io.Serializable;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.To;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.google.common.base.Splitter;
-
-@SuppressWarnings("serial")
-public class SecondarySort extends Configured implements Tool, Serializable {
-
-  static enum COUNTERS {
-    CORRUPT_TIMESTAMP,
-    CORRUPT_LINE
-  }
-
-  // Input records are comma separated. The first field is grouping record. The
-  // second is the one to sort on (a long in this implementation). The rest is
-  // payload to be sorted.
-
-  // For example:
-
-  // one,1,1
-  // one,2,-3
-  // two,4,5
-  // two,2,6
-  // two,1,7,9
-  // three,0,-1
-  // one,-5,10
-  // one,-10,garbage
-
-  private static final char SPLIT_ON = ',';
-  private static final Splitter INPUT_SPLITTER = 
Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3);
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println();
-      System.err.println("Usage: " + this.getClass().getName()
-          + " [generic options] input output");
-      System.err.println();
-      GenericOptionsParser.printGenericCommandUsage(System.err);
-      return 1;
-    }
-    // Create an object to coordinate pipeline creation and execution.
-    Pipeline pipeline = new MRPipeline(SecondarySort.class, getConf());
-    // Reference a given text file as a collection of Strings.
-    PCollection<String> lines = pipeline.readTextFile(args[0]);
-
-    // Define a function that parses each line in a PCollection of Strings into
-    // a pair of pairs, the first of which will be grouped by (first member) 
and
-    // the sorted by (second memeber). The second pair is payload which can be
-    // passed in an Iterable object.
-    PTable<String, Pair<Long, String>> pairs = 
lines.parallelDo("extract_records",
-        new DoFn<String, Pair<String, Pair<Long, String>>>() {
-          @Override
-          public void process(String line, Emitter<Pair<String, Pair<Long, 
String>>> emitter) {
-            int i = 0;
-            String key = "";
-            long timestamp = 0;
-            String value = "";
-            for (String element : INPUT_SPLITTER.split(line)) {
-              switch (++i) {
-              case 1:
-                key = element;
-                break;
-              case 2:
-                try {
-                  timestamp = Long.parseLong(element);
-                } catch (NumberFormatException e) {
-                  System.out.println("Timestamp not in long format '" + line + 
"'");
-                  this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1);
-                }
-                break;
-              case 3:
-                value = element;
-                break;
-              default:
-                System.err.println("i = " + i + " should never happen!");
-                break;
-              }
-            }
-            if (i == 3) {
-              Long sortby = new Long(timestamp);
-              emitter.emit(Pair.of(key, Pair.of(sortby, value)));
-            } else {
-              this.getCounter(COUNTERS.CORRUPT_LINE).increment(1);
-            }
-          }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), 
Avros.strings())));
-
-    // The output of the above input will be (with one reducer):
-
-    // one : [[-10,garbage],[-5,10],[1,1],[2,-3]]
-    // three : [[0,-1]]
-    // two : [[1,7,9],[2,6],[4,5]]
-
-    org.apache.crunch.lib.SecondarySort.sortAndApply(pairs,
-        new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() {
-          final StringBuilder sb = new StringBuilder();
-          @Override
-          public void process(Pair<String, Iterable<Pair<Long, String>>> 
input, Emitter<String> emitter) {
-            sb.setLength(0);
-            sb.append(input.first());
-            sb.append(" : [");
-            boolean first = true;
-            for(Pair<Long, String> pair : input.second()) {
-              if (first) {
-                first = false;
-              } else {
-                sb.append(',');
-              }
-              sb.append(pair);
-            }
-            sb.append("]");
-            emitter.emit(sb.toString());
-          }
-        }, Writables.strings()).write(To.textFile(args[1]));
-
-    // Execute the pipeline as a MapReduce.
-    return pipeline.done().succeeded() ? 0 : 1;
-  }
-
-  public static void main(String[] args) throws Exception {
-    int exitCode = -1;
-    try {
-      exitCode = ToolRunner.run(new Configuration(), new SecondarySort(), 
args);
-    } catch (Throwable e) {
-      e.printStackTrace();
-    }
-    System.exit(exitCode);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8c725ac7/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java
----------------------------------------------------------------------
diff --git 
a/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java
 
b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java
new file mode 100644
index 0000000..998bd7f
--- /dev/null
+++ 
b/crunch-examples/src/main/java/org/apache/crunch/examples/SecondarySortExample.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.SecondarySort;
+import org.apache.crunch.io.To;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Splitter;
+
+@SuppressWarnings("serial")
+public class SecondarySortExample extends Configured implements Tool, 
Serializable {
+
+  static enum COUNTERS {
+    CORRUPT_TIMESTAMP,
+    CORRUPT_LINE
+  }
+
+  // Input records are comma separated. The first field is grouping record. The
+  // second is the one to sort on (a long in this implementation). The rest is
+  // payload to be sorted.
+
+  // For example:
+
+  // one,1,1
+  // one,2,-3
+  // two,4,5
+  // two,2,6
+  // two,1,7,9
+  // three,0,-1
+  // one,-5,10
+  // one,-10,garbage
+
+  private static final char SPLIT_ON = ',';
+  private static final Splitter INPUT_SPLITTER = 
Splitter.on(SPLIT_ON).trimResults().omitEmptyStrings().limit(3);
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println();
+      System.err.println("Usage: " + this.getClass().getName()
+          + " [generic options] input output");
+      System.err.println();
+      GenericOptionsParser.printGenericCommandUsage(System.err);
+      return 1;
+    }
+    // Create an object to coordinate pipeline creation and execution.
+    Pipeline pipeline = new MRPipeline(SecondarySortExample.class, getConf());
+    // Reference a given text file as a collection of Strings.
+    PCollection<String> lines = pipeline.readTextFile(args[0]);
+
+    // Define a function that parses each line in a PCollection of Strings into
+    // a pair of pairs, the first of which will be grouped by (first member) 
and
+    // the sorted by (second memeber). The second pair is payload which can be
+    // passed in an Iterable object.
+    PTable<String, Pair<Long, String>> pairs = 
lines.parallelDo("extract_records",
+        new DoFn<String, Pair<String, Pair<Long, String>>>() {
+          @Override
+          public void process(String line, Emitter<Pair<String, Pair<Long, 
String>>> emitter) {
+            int i = 0;
+            String key = "";
+            long timestamp = 0;
+            String value = "";
+            for (String element : INPUT_SPLITTER.split(line)) {
+              switch (++i) {
+              case 1:
+                key = element;
+                break;
+              case 2:
+                try {
+                  timestamp = Long.parseLong(element);
+                } catch (NumberFormatException e) {
+                  System.out.println("Timestamp not in long format '" + line + 
"'");
+                  this.getCounter(COUNTERS.CORRUPT_TIMESTAMP).increment(1);
+                }
+                break;
+              case 3:
+                value = element;
+                break;
+              default:
+                System.err.println("i = " + i + " should never happen!");
+                break;
+              }
+            }
+            if (i == 3) {
+              Long sortby = new Long(timestamp);
+              emitter.emit(Pair.of(key, Pair.of(sortby, value)));
+            } else {
+              this.getCounter(COUNTERS.CORRUPT_LINE).increment(1);
+            }
+          }}, Avros.tableOf(Avros.strings(), Avros.pairs(Avros.longs(), 
Avros.strings())));
+
+    // The output of the above input will be (with one reducer):
+
+    // one : [[-10,garbage],[-5,10],[1,1],[2,-3]]
+    // three : [[0,-1]]
+    // two : [[1,7,9],[2,6],[4,5]]
+
+    SecondarySort.sortAndApply(pairs,
+        new DoFn<Pair<String, Iterable<Pair<Long, String>>>, String>() {
+          final StringBuilder sb = new StringBuilder();
+          @Override
+          public void process(Pair<String, Iterable<Pair<Long, String>>> 
input, Emitter<String> emitter) {
+            sb.setLength(0);
+            sb.append(input.first());
+            sb.append(" : [");
+            boolean first = true;
+            for(Pair<Long, String> pair : input.second()) {
+              if (first) {
+                first = false;
+              } else {
+                sb.append(',');
+              }
+              sb.append(pair);
+            }
+            sb.append("]");
+            emitter.emit(sb.toString());
+          }
+        }, Writables.strings()).write(To.textFile(args[1]));
+
+    // Execute the pipeline as a MapReduce.
+    return pipeline.done().succeeded() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int exitCode = -1;
+    try {
+      exitCode = ToolRunner.run(new Configuration(), new 
SecondarySortExample(), args);
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+    System.exit(exitCode);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8c725ac7/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java 
b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
index ebf7fb4..30639b1 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -35,6 +35,11 @@ import org.apache.hadoop.conf.Configuration;
 
 /**
  * Utilities for performing a secondary sort on a {@code PTable<K, Pair<V1, 
V2>>} collection.
+ * <p>
+ * Secondary sorts are usually performed during sessionization: given a 
collection
+ * of events, we want to group them by a key (such as a user ID), then sort 
the grouped
+ * records by an auxillary key (such as a timestamp), and then perform some 
additional
+ * processing on the sorted records.
  */
 public class SecondarySort {
   
@@ -95,11 +100,6 @@ public class SecondarySort {
     }
 
     @Override
-    public void setConfigurationForTest(Configuration conf) {
-      intern.setConfigurationForTest(conf);
-    }
-
-    @Override
     public void initialize() {
       intern.setContext(getContext());
     }

Reply via email to