Author: acmurthy
Date: Wed Jul 30 17:05:04 2008
New Revision: 681243
URL: http://svn.apache.org/viewvc?rev=681243&view=rev
Log:
HADOOP-3131. Fix reduce progress reporting for compressed intermediate data.
Contributed by Matei Zaharia.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/io/TestSequenceFileMergeProgress.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=681243&r1=681242&r2=681243&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jul 30 17:05:04 2008
@@ -199,6 +199,9 @@
HADOOP-3848. Cache calls to getSystemDir in the TaskTracker instead of
calling it for each task start. (acmurthy via omalley)
+ HADOOP-3131. Fix reduce progress reporting for compressed intermediate
+ data. (Matei Zaharia via acmurthy)
+
Release 0.18.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java?rev=681243&r1=681242&r2=681243&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java Wed Jul
30 17:05:04 2008
@@ -2808,7 +2808,6 @@
public boolean next() throws IOException {
if (size() == 0)
return false;
- int valLength;
if (minSegment != null) {
//minSegment is non-null for all invocations of next except the first
//one. For the first invocation, the priority queue is ready for use
@@ -2820,17 +2819,16 @@
}
}
minSegment = (SegmentDescriptor)top();
+ long startPos = minSegment.in.getPosition(); // Current position in
stream
//save the raw key reference
rawKey = minSegment.getKey();
//load the raw value. Re-use the existing rawValue buffer
if (rawValue == null) {
rawValue = minSegment.in.createValueBytes();
}
- valLength = minSegment.nextRawValue(rawValue);
- if (progPerByte > 0) {
- totalBytesProcessed += rawKey.getLength() + valLength;
- mergeProgress.set(totalBytesProcessed * progPerByte);
- }
+ minSegment.nextRawValue(rawValue);
+ long endPos = minSegment.in.getPosition(); // End position after
reading value
+ updateProgress(endPos - startPos);
return true;
}
@@ -2839,13 +2837,25 @@
}
private void adjustPriorityQueue(SegmentDescriptor ms) throws
IOException{
- if (ms.nextRawKey()) {
+ long startPos = ms.in.getPosition(); // Current position in stream
+ boolean hasNext = ms.nextRawKey();
+ long endPos = ms.in.getPosition(); // End position after reading key
+ updateProgress(endPos - startPos);
+ if (hasNext) {
adjustTop();
} else {
pop();
ms.cleanup();
}
}
+
+ private void updateProgress(long bytesProcessed) {
+ totalBytesProcessed += bytesProcessed;
+ if (progPerByte > 0) {
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ }
+ }
+
/** This is the single level merge that is called multiple times
* depending on the factor size and the number of segments
* @return RawKeyValueIterator
@@ -2874,6 +2884,8 @@
if (mStream[i].nextRawKey()) {
segmentsToMerge.add(mStream[i]);
segmentsConsidered++;
+ // Count the fact that we read some bytes in calling
nextRawKey()
+ updateProgress(mStream[i].in.getPosition());
}
else {
mStream[i].cleanup();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=681243&r1=681242&r2=681243&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Wed Jul 30
17:05:04 2008
@@ -24,6 +24,7 @@
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -214,7 +215,8 @@
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
private static final int MAX_VINT_SIZE = 9;
- InputStream in;
+ FSDataInputStream rawIn; // Raw InputStream from file
+ InputStream in; // Possibly decompressed stream that we read
Decompressor decompressor;
long bytesRead = 0;
long fileLength = 0;
@@ -233,8 +235,9 @@
protected Reader() {}
- public Reader(Configuration conf, InputStream in, long length,
+ public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec) throws IOException {
+ this.rawIn = in;
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
this.in = codec.createInputStream(in, decompressor);
@@ -248,6 +251,8 @@
public long getLength() { return fileLength; }
+ public long getPosition() throws IOException { return rawIn.getPos(); }
+
/**
* Read upto len bytes into buf starting at offset off.
*
@@ -399,6 +404,14 @@
dataIn.reset(buffer, start, length);
}
+ @Override
+ public long getPosition() throws IOException {
+ // InMemoryReader does not initialize streams like Reader, so in.getPos()
+ // would not work. Instead, return the number of uncompressed bytes read,
+ // which will be correct since in-memory data is not compressed.
+ return bytesRead;
+ }
+
private void dumpOnError() {
File dumpFile = new File("../output/" + taskAttemptId + ".dump");
System.err.println("Dumping corrupt map-output of " + taskAttemptId +
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=681243&r1=681242&r2=681243&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Wed Jul
30 17:05:04 2008
@@ -141,6 +141,10 @@
fs.delete(file, false);
}
}
+
+ public long getPosition() throws IOException {
+ return reader.getPosition();
+ }
}
private static class MergeQueue<K extends Object, V extends Object>
@@ -221,7 +225,12 @@
}
private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
- if (reader.next()) {
+ long startPos = reader.getPosition();
+ boolean hasNext = reader.next();
+ long endPos = reader.getPosition();
+ totalBytesProcessed += endPos - startPos;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ if (hasNext) {
adjustTop();
} else {
pop();
@@ -248,10 +257,6 @@
key = minSegment.getKey();
value = minSegment.getValue();
- totalBytesProcessed += (key.getLength()-key.getPosition()) +
- (value.getLength()-value.getPosition());
- mergeProgress.set(totalBytesProcessed * progPerByte);
-
return true;
}
@@ -293,8 +298,12 @@
// Initialize the segment at the last possible moment;
// this helps in ensuring we don't use buffers until we need them
segment.init();
-
- if (segment.next()) {
+ long startPos = segment.getPosition();
+ boolean hasNext = segment.next();
+ long endPos = segment.getPosition();
+ totalBytesProcessed += endPos - startPos;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ if (hasNext) {
segmentsToMerge.add(segment);
segmentsConsidered++;
}
@@ -330,9 +339,11 @@
}
if (totalBytes != 0) //being paranoid
progPerByte = 1.0f / (float)totalBytes;
-
- // Reset bytes-processed to track the progress of the final merge
- totalBytesProcessed = 0;
+
+ if (totalBytes != 0)
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ else
+ mergeProgress.set(1.0f); // Last pass and no segments left - we're
done
LOG.info("Down to the last merge-pass, with " + numSegments +
" segments left of total size: " + totalBytes + " bytes");
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/io/TestSequenceFileMergeProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/io/TestSequenceFileMergeProgress.java?rev=681243&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/io/TestSequenceFileMergeProgress.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/io/TestSequenceFileMergeProgress.java
Wed Jul 30 17:05:04 2008
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.*;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestSequenceFileMergeProgress extends TestCase {
+ private static final Log LOG = FileInputFormat.LOG;
+ private static final int RECORDS = 10000;
+
+ public void testMergeProgressWithNoCompression() throws IOException {
+ runTest(SequenceFile.CompressionType.NONE);
+ }
+
+ public void testMergeProgressWithRecordCompression() throws IOException {
+ runTest(SequenceFile.CompressionType.RECORD);
+ }
+
+ public void testMergeProgressWithBlockCompression() throws IOException {
+ runTest(SequenceFile.CompressionType.BLOCK);
+ }
+
+ public void runTest(CompressionType compressionType) throws IOException {
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.getLocal(job);
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "test.seq");
+ Path tempDir = new Path(dir, "tmp");
+
+ fs.delete(dir, true);
+ FileInputFormat.setInputPaths(job, dir);
+ fs.mkdirs(tempDir);
+
+ LongWritable tkey = new LongWritable();
+ Text tval = new Text();
+
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, job, file, LongWritable.class, Text.class,
+ compressionType, new DefaultCodec());
+ try {
+ for (int i = 0; i < RECORDS; ++i) {
+ tkey.set(1234);
+ tval.set("valuevaluevaluevaluevaluevaluevaluevaluevaluevaluevalue");
+ writer.append(tkey, tval);
+ }
+ } finally {
+ writer.close();
+ }
+
+ long fileLength = fs.getFileStatus(file).getLen();
+ LOG.info("With compression = " + compressionType + ": "
+ + "compressed length = " + fileLength);
+
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+ job.getOutputKeyComparator(), job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(), job);
+ Path[] paths = new Path[] {file};
+ RawKeyValueIterator rIter = sorter.merge(paths, tempDir, false);
+ int count = 0;
+ while (rIter.next()) {
+ count++;
+ }
+ assertEquals(RECORDS, count);
+ assertEquals(1.0f, rIter.getProgress().get());
+ }
+
+}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=681243&r1=681242&r2=681243&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Wed
Jul 30 17:05:04 2008
@@ -24,10 +24,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.Progressable;
/**
@@ -73,11 +73,12 @@
};
public void runValueIterator(Path tmpDir, Pair[] vals,
- Configuration conf) throws IOException {
+ Configuration conf,
+ CompressionCodec codec) throws IOException {
FileSystem fs = tmpDir.getFileSystem(conf);
Path path = new Path(tmpDir, "data.in");
IFile.Writer<Text, Text> writer =
- new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class,
null);
+ new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class,
codec);
for(Pair p: vals) {
writer.append(new Text(p.key), new Text(p.value));
}
@@ -85,7 +86,7 @@
@SuppressWarnings("unchecked")
RawKeyValueIterator rawItr =
- Merger.merge(conf, fs, Text.class, Text.class, null, new Path[]{path},
+ Merger.merge(conf, fs, Text.class, Text.class, codec, new Path[]{path},
false, conf.getInt("io.sort.factor", 100), tmpDir,
new Text.Comparator(), new NullProgress());
@SuppressWarnings("unchecked") // WritableComparators are not generic
@@ -114,13 +115,25 @@
valItr.nextKey();
}
assertEquals(vals.length, i);
+ // make sure we have progress equal to 1.0
+ assertEquals(1.0f, rawItr.getProgress().get());
}
public void testValueIterator() throws Exception {
Path tmpDir = new Path("build/test/test.reduce.task");
Configuration conf = new Configuration();
for (Pair[] testCase: testCases) {
- runValueIterator(tmpDir, testCase, conf);
+ runValueIterator(tmpDir, testCase, conf, null);
+ }
+ }
+
+ public void testValueIteratorWithCompression() throws Exception {
+ Path tmpDir = new Path("build/test/test.reduce.task.compression");
+ Configuration conf = new Configuration();
+ DefaultCodec codec = new DefaultCodec();
+ codec.setConf(conf);
+ for (Pair[] testCase: testCases) {
+ runValueIterator(tmpDir, testCase, conf, codec);
}
}
}