Author: cutting Date: Tue May 1 13:38:17 2007 New Revision: 534234 URL: http://svn.apache.org/viewvc?view=rev&rev=534234 Log: HADOOP-1247. Add support to contrib/streaming for aggregate package. Contributed by Runping.
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534234&r1=534233&r2=534234 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue May 1 13:38:17 2007 @@ -297,6 +297,9 @@ 88. HADOOP-1272. Extract inner classes from FSNamesystem into separate classes. (Dhruba Borthakur via tomwhite) +89. HADOOP-1247. Add support to contrib/streaming for aggregate + package, formerly called Abacus. (Runping Qi via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=534234&r1=534233&r2=534234 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue May 1 13:38:17 2007 @@ -26,13 +26,10 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -49,10 +46,11 @@ import org.apache.commons.cli2.validation.Validator; import org.apache.commons.logging.*; +import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner; +import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileAlreadyExistsException; @@ -66,7 +64,7 @@ import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.filecache.*; import org.apache.hadoop.util.*; -import org.apache.log4j.helpers.OptionConverter; + /** All the client-side work happens here. * (Jar packaging, MapRed job submission and monitoring) * @author Michel Tourn @@ -213,7 +211,6 @@ verbose_ = cmdLine.hasOption("-verbose"); detailedUsage_ = cmdLine.hasOption("-info"); debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_; - inputTagged_ = cmdLine.hasOption("-inputtagged"); inputSpecs_.addAll(cmdLine.getValues("-input")); output_ = (String) cmdLine.getValue("-output"); @@ -709,19 +706,14 @@ if (inReaderSpec_ == null && inputFormatSpec_ == null) { fmt = KeyValueTextInputFormat.class; } else if (inputFormatSpec_ != null) { - if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0) - || (inputFormatSpec_ - .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) { + if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName()) + || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) { fmt = KeyValueTextInputFormat.class; - } else if ((inputFormatSpec_ - .compareToIgnoreCase("SequenceFileInputFormat") == 0) - || (inputFormatSpec_ - .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) { + } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName()) + || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) { fmt = SequenceFileInputFormat.class; - } else if ((inputFormatSpec_ - .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0) - || (inputFormatSpec_ - .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) { + } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName()) + || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) { fmt = SequenceFileAsTextInputFormat.class; } else { c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage); @@ -774,12 +766,19 @@ reducerNone_ = false; if (redCmd_ != null) { reducerNone_ = redCmd_.equals(REDUCE_NONE); - c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage); - if (c != null) { - jobConf_.setReducerClass(c); + if (redCmd_.compareToIgnoreCase("aggregate") == 0) { + jobConf_.setReducerClass(ValueAggregatorReducer.class); + jobConf_.setCombinerClass(ValueAggregatorCombiner.class); } else { - jobConf_.setReducerClass(PipeReducer.class); - jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(redCmd_, "UTF-8")); + + c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage); + if (c != null) { + jobConf_.setReducerClass(c); + } else { + jobConf_.setReducerClass(PipeReducer.class); + jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode( + redCmd_, "UTF-8")); + } } } Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java?view=auto&rev=534234 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java (added) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StreamAggregate.java Tue May 1 13:38:17 2007 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.*; + +import org.apache.hadoop.streaming.Environment; + +/** + Used to test the usage of external applications without adding + platform-specific dependencies. + */ +public class StreamAggregate extends TrApp +{ + + public StreamAggregate() + { + super('.', ' '); + } + + public void go() throws IOException + { + testParentJobConfToEnvVars(); + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String line; + + while ((line = in.readLine()) != null) { + String [] words = line.split(" "); + for (int i = 0; i< words.length; i++) { + String out = "LongValueSum:" + words[i].trim() + "\t" + "1"; + System.out.println(out); + } + } + } + + public static void main(String[] args) throws IOException + { + TrApp app = new StreamAggregate(); + app.go(); + } +} Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java?view=auto&rev=534234 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java (added) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java Tue May 1 13:38:17 2007 @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import junit.framework.TestCase; +import java.io.*; +import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * This class tests hadoopStreaming in MapReduce local mode. + * It uses Hadoop Aggregate to count the numbers of word occurrences + * in the input. + */ +public class TestStreamAggregate extends TestCase +{ + protected File INPUT_FILE = new File("stream_aggregate_input.txt"); + protected File OUTPUT_DIR = new File("stream_aggregate_out"); + protected String input = "roses are red\nviolets are blue\nbunnies are pink\n"; + // map parses input lines and generates count entries for each word. + protected String map = StreamUtil.makeJavaCommand(StreamAggregate.class, new String[]{".", "\\n"}); + // Use the aggregate combine, reducei to aggregate the counts + protected String outputExpect = "are\t3\nblue\t1\nbunnies\t1\npink\t1\nred\t1\nroses\t1\nviolets\t1\n"; + + private StreamJob job; + + public TestStreamAggregate() throws IOException + { + UtilTest utilTest = new UtilTest(getClass().getName()); + utilTest.checkUserDir(); + utilTest.redirectIfAntJunit(); + } + + protected void createInput() throws IOException + { + DataOutputStream out = new DataOutputStream( + new FileOutputStream(INPUT_FILE.getAbsoluteFile())); + out.write(input.getBytes("UTF-8")); + out.close(); + } + + protected String[] genArgs() { + return new String[] { + "-input", INPUT_FILE.getAbsolutePath(), + "-output", OUTPUT_DIR.getAbsolutePath(), + "-mapper", map, + "-reducer", "aggregate", + //"-verbose", + //"-jobconf", "stream.debug=set" + "-jobconf", "keep.failed.task.files=true", + "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp") + }; + } + + public void testCommandLine() + { + try { + try { + OUTPUT_DIR.getAbsoluteFile().delete(); + } catch (Exception e) { + } + + createInput(); + boolean mayExit = false; + + // During tests, the default Configuration will use a local mapred + // So don't specify -config or -cluster + job = new StreamJob(genArgs(), mayExit); + job.go(); + File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile(); + String output = StreamUtil.slurp(outFile); + outFile.delete(); + System.err.println("outEx1=" + outputExpect); + System.err.println(" out1=" + output); + assertEquals(outputExpect, output); + } catch(Exception e) { + failTrace(e); + } finally { + File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile(); + INPUT_FILE.delete(); + outFileCRC.delete(); + OUTPUT_DIR.getAbsoluteFile().delete(); + } + } + + private void failTrace(Exception e) + { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + fail(sw.toString()); + } + + public static void main(String[]args) throws Exception + { + new TestStreaming().testCommandLine(); + } + +}