Updated Branches: refs/heads/master a65feb569 -> 17ec15584
CRUNCH-68: Fix command line parser for examples. Use GenericOptionsParser to deal with Hadoop options. Make examples exit with appropriate status codes. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/17ec1558 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/17ec1558 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/17ec1558 Branch: refs/heads/master Commit: 17ec1558455f2ba7ab099da5f028b4b6b01adda0 Parents: a65feb5 Author: Matthias Friedrich <[email protected]> Authored: Wed Sep 19 20:22:27 2012 +0200 Committer: Matthias Friedrich <[email protected]> Committed: Thu Sep 20 07:29:50 2012 +0200 ---------------------------------------------------------------------- .../apache/crunch/examples/AverageBytesByIP.java | 17 +++++++++----- .../org/apache/crunch/examples/TotalBytesByIP.java | 17 +++++++++----- .../java/org/apache/crunch/examples/WordCount.java | 17 +++++++++----- 3 files changed, 33 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java index 868e38a..52b542a 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java @@ -29,6 +29,7 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; @@ -47,7 +48,9 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable { static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; public int run(String[] args) throws Exception { - if (args.length != 2) { + String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + + if (remainingArgs.length != 3) { System.err.println(); System.err.println("Two and only two arguments are accepted."); System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); @@ -58,7 +61,7 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable { // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(AverageBytesByIP.class, getConf()); // Reference a given text file as a collection of Strings. - PCollection<String> lines = pipeline.readTextFile(args[0]); + PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]); // Combiner used for summing up response size and count CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = CombineFn.pairAggregator(CombineFn.SUM_LONGS, @@ -75,10 +78,11 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable { Writables.tableOf(Writables.strings(), Writables.doubles())); // write the result to a text file - pipeline.writeTextFile(avgs, args[1]); + pipeline.writeTextFile(avgs, remainingArgs[2]); // Execute the pipeline as a MapReduce. - pipeline.done(); - return 0; + PipelineResult result = pipeline.done(); + + return result.succeeded() ? 0 : 1; } // Function to calculate the average response size for a given ip address @@ -129,6 +133,7 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable { }; public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new AverageBytesByIP(), args); + int result = ToolRunner.run(new Configuration(), new AverageBytesByIP(), args); + System.exit(result); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java index 1953e3a..59b05fa 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java @@ -28,6 +28,7 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; @@ -46,7 +47,9 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable { static final String logRegex = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; public int run(String[] args) throws Exception { - if (args.length != 2) { + String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + + if (remainingArgs.length != 3) { System.err.println(); System.err.println("Two and only two arguments are accepted."); System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); @@ -57,7 +60,7 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable { // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf()); // Reference a given text file as a collection of Strings. - PCollection<String> lines = pipeline.readTextFile(args[0]); + PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]); // Combiner used for summing up response size CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS(); @@ -67,10 +70,11 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable { .parallelDo(extractIPResponseSize, Writables.tableOf(Writables.strings(), Writables.longs())).groupByKey() .combineValues(longSumCombiner); - pipeline.writeTextFile(ipAddrResponseSize, args[1]); + pipeline.writeTextFile(ipAddrResponseSize, remainingArgs[2]); // Execute the pipeline as a MapReduce. - pipeline.done(); - return 0; + PipelineResult result = pipeline.done(); + + return result.succeeded() ? 0 : 1; } // Function to parse apache log records @@ -101,6 +105,7 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable { }; public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new TotalBytesByIP(), args); + int result = ToolRunner.run(new Configuration(), new TotalBytesByIP(), args); + System.exit(result); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/17ec1558/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java index e4ce25b..31d99d3 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordCount.java @@ -24,6 +24,7 @@ import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; @@ -34,7 +35,9 @@ import org.apache.hadoop.util.ToolRunner; public class WordCount extends Configured implements Tool, Serializable { public int run(String[] args) throws Exception { - if (args.length != 3) { + String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + + if (remainingArgs.length != 3) { System.err.println(); System.err.println("Usage: " + this.getClass().getName() + " [generic options] input output"); System.err.println(); @@ -44,7 +47,7 @@ public class WordCount extends Configured implements Tool, Serializable { // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); // Reference a given text file as a collection of Strings. - PCollection<String> lines = pipeline.readTextFile(args[1]); + PCollection<String> lines = pipeline.readTextFile(remainingArgs[1]); // Define a function that splits each line in a PCollection of Strings into // a @@ -64,13 +67,15 @@ public class WordCount extends Configured implements Tool, Serializable { PTable<String, Long> counts = words.count(); // Instruct the pipeline to write the resulting counts to a text file. - pipeline.writeTextFile(counts, args[2]); + pipeline.writeTextFile(counts, remainingArgs[2]); // Execute the pipeline as a MapReduce. - pipeline.done(); - return 0; + PipelineResult result = pipeline.done(); + + return result.succeeded() ? 0 : 1; } public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new WordCount(), args); + int result = ToolRunner.run(new Configuration(), new WordCount(), args); + System.exit(result); } }
