Author: cutting Date: Fri Feb 23 09:57:01 2007 New Revision: 511039 URL: http://svn.apache.org/viewvc?view=rev&rev=511039 Log: HADOOP-1029. Fix streaming's input format to correctly seek to the start of splits. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511039&r1=511038&r2=511039 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Feb 23 09:57:01 2007 @@ -119,6 +119,9 @@ 35. HADOOP-248. Optimize location of map outputs to not use random probes. (Devaraj Das via cutting) +36. HADOOP-1029. Fix streaming's input format to correctly seek to + the start of splits. (Arun C Murthy via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=511039&r1=511038&r2=511039 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Fri Feb 23 09:57:01 2007 @@ -72,16 +72,27 @@ JobConf job, Reporter reporter) throws IOException { FileSplit split = (FileSplit) genericSplit; - FileSystem fs = split.getPath().getFileSystem(job); LOG.info("getRecordReader start.....split=" + split); reporter.setStatus(split.toString()); - final long start = split.getStart(); - final long end = start + split.getLength(); - - FSDataInputStream in = fs.open(split.getPath()); + long start = split.getStart(); + long length = split.getLength(); - // will open the file and seek to the start of the split + // Open the file and seek to the start of the split + FileSystem fs = split.getPath().getFileSystem(job); + FSDataInputStream in = fs.open(split.getPath()); + if (isGzippedInput(job)) { + length = Long.MAX_VALUE; + } else if (start != 0) { + in.seek(start-1); + LineRecordReader.readLine(in, null); + long oldStart = start; + start = in.getPos(); + length -= (start - oldStart); + } + // Ugly hack! + split = new FileSplit(split.getPath(), start, length, job); + // Factory dispatch based on available params.. Class readerClass; String c = job.get("stream.recordreader.class"); Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=511039&r1=511038&r2=511039 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Fri Feb 23 09:57:01 2007 @@ -55,7 +55,7 @@ Reporter reporter, JobConf job, FileSystem fs) throws IOException { super(createStream(in, job), split.getStart(), - split.getStart() + split.getLength()); + (split.getStart() + split.getLength())); this.split = split ; this.reporter = reporter ; } @@ -92,21 +92,23 @@ Text tKey = (Text) key; Text tValue = (Text) value; - byte[] line = null ; + byte[] line = null ; + int lineLen = -1; if( super.next(dummyKey, innerValue) ){ - line = innerValue.getBytes(); + line = innerValue.getBytes(); + lineLen = innerValue.getLength(); }else{ return false; } if (line == null) return false; - int tab = UTF8ByteArrayUtils.findTab(line); + int tab = UTF8ByteArrayUtils.findTab(line, 0, lineLen); if (tab == -1) { - tKey.set(line); + tKey.set(line, 0, lineLen); tValue.set(""); } else { - UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab); + UTF8ByteArrayUtils.splitKeyVal(line, 0, lineLen, tKey, tValue, tab); } - numRecStats(line, 0, line.length); + numRecStats(line, 0, lineLen); return true; } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=diff&rev=511039&r1=511038&r2=511039 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Fri Feb 23 09:57:01 2007 @@ -33,38 +33,67 @@ /** * Find the first occured tab in a UTF-8 encoded string * @param utf a byte array containing a UTF-8 encoded string + * @param start starting offset + * @param length no. of bytes * @return position that first tab occures otherwise -1 */ - public static int findTab(byte [] utf) { - for(int i=0; i<utf.length; i++) { + public static int findTab(byte [] utf, int start, int length) { + for(int i=start; i<(start+length); i++) { if(utf[i]==(byte)'\t') { return i; } - } - return -1; + } + return -1; } /** + * Find the first occured tab in a UTF-8 encoded string + * @param utf a byte array containing a UTF-8 encoded string + * @return position that first tab occures otherwise -1 + */ + public static int findTab(byte [] utf) { + return findTab(utf, 0, utf.length); + } + + /** * split a UTF-8 byte array into key and value * assuming that the delimilator is at splitpos. * @param utf utf-8 encoded string + * @param start starting offset + * @param length no. of bytes * @param key contains key upon the method is returned * @param val contains value upon the method is returned * @param splitPos the split pos * @throws IOException */ - public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) - throws IOException { - if(splitPos<0 || splitPos >= utf.length) - throw new IllegalArgumentException( - "splitPos must be in the range [0, "+splitPos+"]: " +splitPos); - byte [] keyBytes = new byte[splitPos]; - System.arraycopy(utf, 0, keyBytes, 0, splitPos); - int valLen = utf.length-splitPos-1; + public static void splitKeyVal(byte[] utf, int start, int length, + Text key, Text val, int splitPos) throws IOException { + if(splitPos<start || splitPos >= (start+length)) + throw new IllegalArgumentException( "splitPos must be in the range " + + "[" + start + ", " + (start+length) + "]: " + splitPos); + int keyLen = (splitPos-start); + byte [] keyBytes = new byte[keyLen]; + System.arraycopy(utf, start, keyBytes, 0, keyLen); + int valLen = (start+length)-splitPos-1; byte [] valBytes = new byte[valLen]; - System.arraycopy(utf,splitPos+1, valBytes, 0, valLen ); + System.arraycopy(utf, splitPos+1, valBytes, 0, valLen); key.set(keyBytes); val.set(valBytes); + } + + + /** + * split a UTF-8 byte array into key and value + * assuming that the delimilator is at splitpos. + * @param utf utf-8 encoded string + * @param key contains key upon the method is returned + * @param val contains value upon the method is returned + * @param splitPos the split pos + * @throws IOException + */ + public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) + throws IOException { + splitKeyVal(utf, 0, utf.length, key, val, splitPos); } /**