Author: cutting Date: Wed Dec 6 15:44:32 2006 New Revision: 483293 URL: http://svn.apache.org/viewvc?view=rev&rev=483293 Log: HADOOP-779. Fix contrib/streaming to work correctly with gzipped input. Contributed by Hairong.
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/build-contrib.xml lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483293&r1=483292&r2=483293 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 6 15:44:32 2006 @@ -5,6 +5,9 @@ 1. HADOOP-780. Use ReflectionUtils to instantiate key and value objects. (ab) + 2. HADOOP-779. Fix contrib/streaming to work correctly with gzipped + input files. (Hairong Kuang via cutting) + Release 0.9.0 - 2006-12-01 Modified: lucene/hadoop/trunk/src/contrib/build-contrib.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/build-contrib.xml?view=diff&rev=483293&r1=483292&r2=483293 ============================================================================== --- lucene/hadoop/trunk/src/contrib/build-contrib.xml (original) +++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Wed Dec 6 15:44:32 2006 @@ -103,7 +103,7 @@ srcdir="${src.test}" includes="**/*.java" destdir="${build.test}" - debug="${debug}"> + debug="${javac.debug}"> <classpath refid="test.classpath"/> </javac> </target> Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=483293&r1=483292&r2=483293 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Wed Dec 6 15:44:32 2006 @@ -45,7 +45,7 @@ super(in, split, reporter, job, fs); gzipped_ = StreamInputFormat.isGzippedInput(job); if (gzipped_) { - din_ = new DataInputStream(new GZIPInputStream(in_)); + din_ = new BufferedInputStream( (new GZIPInputStream(in_) ) ); } else { din_ = in_; } @@ -88,40 +88,24 @@ Text tValue = (Text) value; byte[] line; - while (true) { - if (gzipped_) { - // figure EOS from readLine - } else { - long pos = in_.getPos(); - if (pos >= end_) return false; - } - - line = UTF8ByteArrayUtils.readLine((InputStream) in_); - if (line == null) return false; - try { - Text.validateUTF8(line); - } catch (MalformedInputException m) { - System.err.println("line=" + line + "|" + new Text(line)); - System.out.flush(); - } - try { - int tab = UTF8ByteArrayUtils.findTab(line); - if (tab == -1) { - tKey.set(line); - tValue.set(""); - } else { - UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab); - } - break; - } catch (MalformedInputException e) { - LOG.warn(StringUtils.stringifyException(e)); - } + if ( !gzipped_ ) { + long pos = in_.getPos(); + if (pos >= end_) return false; + } + + line = UTF8ByteArrayUtils.readLine((InputStream) din_); + if (line == null) return false; + int tab = UTF8ByteArrayUtils.findTab(line); + if (tab == -1) { + tKey.set(line); + tValue.set(""); + } else { + UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab); } numRecStats(line, 0, line.length); return true; } boolean gzipped_; - GZIPInputStream zin_; - DataInputStream din_; // GZIP or plain + InputStream din_; // GZIP or plain } Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?view=auto&rev=483293 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java (added) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java Wed Dec 6 15:44:32 2006 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.zip.GZIPOutputStream; + +/** + * This class tests gzip input streaming in MapReduce local mode. + */ +public class TestGzipInput extends TestStreaming +{ + + public TestGzipInput() throws IOException { + } + + protected void createInput() throws IOException + { + GZIPOutputStream out = new GZIPOutputStream( + new FileOutputStream(INPUT_FILE.getAbsoluteFile())); + out.write(input.getBytes("UTF-8")); + out.close(); + } + + protected String[] genArgs() { + return new String[] { + "-input", INPUT_FILE.getAbsolutePath(), + "-output", OUTPUT_DIR.getAbsolutePath(), + "-mapper", map, + "-combiner", combine, + "-reducer", reduce, + "-jobconf", "stream.recordreader.compression=gzip" + }; + + } + + public static void main(String[]args) throws Exception + { + new TestGzipInput().testCommandLine(); + } + +} Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=483293&r1=483292&r2=483293 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Wed Dec 6 15:44:32 2006 @@ -33,17 +33,17 @@ // "map" command: grep -E (red|green|blue) // reduce command: uniq - String INPUT_FILE = "input.txt"; - String OUTPUT_DIR = "out"; - String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; + protected File INPUT_FILE = new File("input.txt"); + protected File OUTPUT_DIR = new File("out"); + protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; // map behaves like "/usr/bin/tr . \\n"; (split words into lines) - String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"}); + protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"}); // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R. - String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"}); - String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"}); - String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n"; + protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"}); + protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"}); + protected String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n"; - StreamJob job; + private StreamJob job; public TestStreaming() throws IOException { @@ -52,14 +52,27 @@ utilTest.redirectIfAntJunit(); } - void createInput() throws IOException + protected void createInput() throws IOException { - String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked vm - DataOutputStream out = new DataOutputStream(new FileOutputStream(path)); + DataOutputStream out = new DataOutputStream( + new FileOutputStream(INPUT_FILE.getAbsoluteFile())); out.write(input.getBytes("UTF-8")); out.close(); } + protected String[] genArgs() { + return new String[] { + "-input", INPUT_FILE.getAbsolutePath(), + "-output", OUTPUT_DIR.getAbsolutePath(), + "-mapper", map, + "-combiner", combine, + "-reducer", reduce, + //"-verbose", + //"-jobconf", "stream.debug=set" + "-jobconf", "keep.failed.task.files=true" + }; + } + public void testCommandLine() { try { @@ -68,30 +81,23 @@ // During tests, the default Configuration will use a local mapred // So don't specify -config or -cluster - String argv[] = new String[] { - "-input", INPUT_FILE, - "-output", OUTPUT_DIR, - "-mapper", map, - "-combiner", combine, - "-reducer", reduce, - //"-verbose", - //"-jobconf", "stream.debug=set" - "-jobconf", "keep.failed.task.files=true", - }; - job = new StreamJob(argv, mayExit); + job = new StreamJob(genArgs(), mayExit); job.go(); - File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile(); + File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile(); String output = StreamUtil.slurp(outFile); + outFile.delete(); System.err.println("outEx1=" + outputExpect); System.err.println(" out1=" + output); assertEquals(outputExpect, output); - } catch(Exception e) { failTrace(e); + } finally { + INPUT_FILE.delete(); + OUTPUT_DIR.delete(); } } - void failTrace(Exception e) + private void failTrace(Exception e) { StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw));