Author: acmurthy
Date: Fri Jul 11 08:33:13 2008
New Revision: 675991
URL: http://svn.apache.org/viewvc?rev=675991&view=rev
Log:
Merge -r 675988:675989 from trunk to branch-0.18 to fix HADOOP-3647
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Fri Jul 11 08:33:13 2008
@@ -744,6 +744,9 @@
HADOOP-3718. Fix KFSOutputStream::write(int) to output a byte instead of
an int, per the OutputStream contract. (Sriram Rao via cdouglas)
+ HADOOP-3647. Add debug logs to help track down a very occassional,
+ hard-to-reproduce, bug in shuffle/merge on the reducer. (acmurthy)
+
Release 0.17.2 - Unreleased
BUG FIXES
Modified:
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java
(original)
+++
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/IFile.java
Fri Jul 11 08:33:13 2008
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
import java.io.EOFException;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -45,7 +47,7 @@
*/
class IFile {
- private static int EOF_MARKER = -1;
+ private static final int EOF_MARKER = -1;
/**
* <code>IFile.Writer</code> to write out intermediate map-outputs.
@@ -54,6 +56,7 @@
FSDataOutputStream out;
boolean ownOutputStream = false;
long start = 0;
+ FSDataOutputStream rawOut;
CompressionOutputStream compressedOut;
Compressor compressor;
@@ -79,6 +82,9 @@
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec) throws IOException {
+ this.rawOut = out;
+ this.start = this.rawOut.getPos();
+
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressor.reset();
@@ -88,7 +94,6 @@
} else {
this.out = out;
}
- this.start = this.out.getPos();
this.keyClass = keyClass;
this.valueClass = valueClass;
@@ -100,34 +105,34 @@
}
public void close() throws IOException {
+ // Close the serializers
+ keySerializer.close();
+ valueSerializer.close();
+
// Write EOF_MARKER for key/value length
WritableUtils.writeVInt(out, EOF_MARKER);
WritableUtils.writeVInt(out, EOF_MARKER);
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
if (compressOutput) {
- // Return the compressor
+ // Flush data from buffers into the compressor
+ out.flush();
+
+ // Flush & return the compressor
compressedOut.finish();
compressedOut.resetState();
CodecPool.returnCompressor(compressor);
}
-
- // Close the serializers
- keySerializer.close();
- valueSerializer.close();
// Close the stream
- if (out != null) {
- out.flush();
- compressedBytesWritten = out.getPos() - start;
-
- // Close the underlying stream iff we own it...
- if (ownOutputStream) {
- out.close();
- }
-
- out = null;
+ rawOut.flush();
+ compressedBytesWritten = rawOut.getPos() - start;
+
+ // Close the underlying stream iff we own it...
+ if (ownOutputStream) {
+ out.close();
}
+ out = null;
}
public void append(K key, V value) throws IOException {
@@ -141,12 +146,18 @@
// Append the 'key'
keySerializer.serialize(key);
int keyLength = buffer.getLength();
- if (keyLength == 0)
- throw new IOException("zero length keys not allowed: " + key);
+ if (keyLength < 0) {
+ throw new IOException("Negative key-length not allowed: " + keyLength
+
+ " for " + key);
+ }
// Append the 'value'
valueSerializer.serialize(value);
int valueLength = buffer.getLength() - keyLength;
+ if (valueLength < 0) {
+ throw new IOException("Negative value-length not allowed: " +
+ valueLength + " for " + value);
+ }
// Write the record out
WritableUtils.writeVInt(out, keyLength); // key length
@@ -165,8 +176,17 @@
public void append(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
- int valueLength = value.getLength() - value.getPosition();
+ if (keyLength < 0) {
+ throw new IOException("Negative key-length not allowed: " + keyLength
+
+ " for " + key);
+ }
+ int valueLength = value.getLength() - value.getPosition();
+ if (valueLength < 0) {
+ throw new IOException("Negative value-length not allowed: " +
+ valueLength + " for " + value);
+ }
+
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(key.getData(), key.getPosition(), keyLength);
@@ -192,7 +212,7 @@
*/
public static class Reader<K extends Object, V extends Object> {
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
- private static final int MAX_VINT_SIZE = 5;
+ private static final int MAX_VINT_SIZE = 9;
InputStream in;
Decompressor decompressor;
@@ -204,6 +224,8 @@
int bufferSize = DEFAULT_BUFFER_SIZE;
DataInputBuffer dataIn = new DataInputBuffer();
+ int recNo = 1;
+
public Reader(Configuration conf, FileSystem fs, Path file,
CompressionCodec codec) throws IOException {
this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
@@ -226,6 +248,15 @@
public long getLength() { return fileLength; }
+ /**
+ * Read upto len bytes into buf starting at offset off.
+ *
+ * @param buf buffer
+ * @param off offset
+ * @param len length of buffer
+ * @return the no. of bytes read
+ * @throws IOException
+ */
private int readData(byte[] buf, int off, int len) throws IOException {
int bytesRead = 0;
while (bytesRead < len) {
@@ -291,6 +322,16 @@
return false;
}
+ // Sanity check
+ if (keyLength < 0) {
+ throw new IOException("Rec# " + recNo + ": Negative key-length: " +
+ keyLength);
+ }
+ if (valueLength < 0) {
+ throw new IOException("Rec# " + recNo + ": Negative value-length: " +
+ valueLength);
+ }
+
final int recordLength = keyLength + valueLength;
// Check if we have the raw key/value in the buffer
@@ -299,7 +340,8 @@
// Sanity check
if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
- throw new EOFException("Could read the next record");
+ throw new EOFException("Rec# " + recNo + ": Could read the next " +
+ " record");
}
}
@@ -310,9 +352,17 @@
value.reset(data, (pos + keyLength), valueLength);
// Position for the next record
- dataIn.skip(recordLength);
+ long skipped = dataIn.skip(recordLength);
+ if (skipped != recordLength) {
+ throw new IOException("Rec# " + recNo + ": Failed to skip past record
" +
+ "of length: " + recordLength);
+ }
+
+ // Record the bytes read
bytesRead += recordLength;
+ ++recNo;
+
return true;
}
@@ -324,9 +374,7 @@
}
// Close the underlying stream
- if (in != null) {
- in.close();
- }
+ in.close();
// Release the buffer
dataIn = null;
@@ -339,18 +387,34 @@
*/
public static class InMemoryReader<K, V> extends Reader<K, V> {
RamManager ramManager;
+ TaskAttemptID taskAttemptId;
- public InMemoryReader(RamManager ramManager,
+ public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
byte[] data, int start, int length) {
this.ramManager = ramManager;
+ this.taskAttemptId = taskAttemptId;
buffer = data;
fileLength = bufferSize = (length - start);
dataIn.reset(buffer, start, length);
}
+ private void dumpOnError() {
+ File dumpFile = new File("../output/" + taskAttemptId + ".dump");
+ System.err.println("Dumping corrupt map-output of " + taskAttemptId +
+ " to " + dumpFile.getAbsolutePath());
+ try {
+ FileOutputStream fos = new FileOutputStream(dumpFile);
+ fos.write(buffer, 0, bufferSize);
+ fos.close();
+ } catch (IOException ioe) {
+ System.err.println("Failed to dump map-output of " + taskAttemptId);
+ }
+ }
+
public boolean next(DataInputBuffer key, DataInputBuffer value)
throws IOException {
+ try {
// Sanity check
if (eof) {
throw new EOFException("Completed reading " + bytesRead);
@@ -369,6 +433,16 @@
return false;
}
+ // Sanity check
+ if (keyLength < 0) {
+ throw new IOException("Rec# " + recNo + ": Negative key-length: " +
+ keyLength);
+ }
+ if (valueLength < 0) {
+ throw new IOException("Rec# " + recNo + ": Negative value-length: " +
+ valueLength);
+ }
+
final int recordLength = keyLength + valueLength;
// Setup the key and value
@@ -380,14 +454,20 @@
// Position for the next record
long skipped = dataIn.skip(recordLength);
if (skipped != recordLength) {
- throw new IOException("Failed to skip past record of length: " +
+ throw new IOException("Rec# " + recNo + ": Failed to skip past record
of length: " +
recordLength);
}
// Record the byte
bytesRead += recordLength;
+ ++recNo;
+
return true;
+ } catch (IOException ioe) {
+ dumpOnError();
+ throw ioe;
+ }
}
public void close() {
Modified:
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
Fri Jul 11 08:33:13 2008
@@ -1002,18 +1002,14 @@
//The final index file output stream
FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
4096);
- long segmentStart;
-
if (numSpills == 0) {
//create dummy files
for (int i = 0; i < partitions; i++) {
- segmentStart = finalOut.getPos();
+ long segmentStart = finalOut.getPos();
Writer<K, V> writer = new Writer<K, V>(job, finalOut,
keyClass, valClass, null);
- finalIndexOut.writeLong(segmentStart);
- finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
- finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
writer.close();
+ writeIndexRecord(finalIndexOut, finalOut, segmentStart, writer);
}
finalOut.close();
finalIndexOut.close();
@@ -1054,7 +1050,7 @@
job.getOutputKeyComparator(), reporter);
//write merged output to disk
- segmentStart = finalOut.getPos();
+ long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
if (null == combinerClass || job.getCombineOnceOnly() ||
@@ -1097,6 +1093,8 @@
indexOut.writeLong(writer.getRawLength());
long segmentLength = out.getPos() - start;
indexOut.writeLong(segmentLength);
+ LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " +
+ segmentLength + ")");
}
} // MapOutputBuffer
Modified:
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Fri Jul 11 08:33:13 2008
@@ -52,6 +52,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
@@ -59,6 +60,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
@@ -663,6 +665,7 @@
/** Describes the output of a map; could either be on disk or in-memory. */
private class MapOutput {
final TaskID mapId;
+ final TaskAttemptID mapAttemptId;
final Path file;
final Configuration conf;
@@ -671,8 +674,10 @@
final boolean inMemory;
long size;
- public MapOutput(TaskID mapId, Configuration conf, Path file, long size)
{
+ public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
+ Configuration conf, Path file, long size) {
this.mapId = mapId;
+ this.mapAttemptId = mapAttemptId;
this.conf = conf;
this.file = file;
@@ -683,8 +688,9 @@
this.inMemory = false;
}
- public MapOutput(TaskID mapId, byte[] data) {
+ public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data) {
this.mapId = mapId;
+ this.mapAttemptId = mapAttemptId;
this.file = null;
this.conf = null;
@@ -1177,7 +1183,8 @@
// Copy map-output into an in-memory buffer
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
- new MapOutput(mapOutputLoc.getTaskId(), shuffleData);
+ new MapOutput(mapOutputLoc.getTaskId(),
+ mapOutputLoc.getTaskAttemptId(), shuffleData);
int bytesRead = 0;
try {
@@ -1246,6 +1253,16 @@
);
}
+ // TODO: Remove this after a 'fix' for HADOOP-3647
+ if (mapOutputLength > 0) {
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(shuffleData, 0, shuffleData.length);
+ LOG.info("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> ("
+
+ WritableUtils.readVInt(dib) + ", " +
+ WritableUtils.readVInt(dib) + ") from " +
+ mapOutputLoc.getHost());
+ }
+
return mapOutput;
}
@@ -1260,8 +1277,8 @@
mapOutputLength, conf);
MapOutput mapOutput =
- new MapOutput(mapOutputLoc.getTaskId(), conf,
- localFileSys.makeQualified(localFilename),
+ new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(),
+ conf, localFileSys.makeQualified(localFilename),
mapOutputLength);
@@ -1806,7 +1823,7 @@
MapOutput mo = mapOutputsFilesInMemory.remove(0);
Reader<K, V> reader =
- new InMemoryReader<K, V>(ramManager,
+ new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
mo.data, 0, mo.data.length);
Segment<K, V> segment =
new Segment<K, V>(reader, true);
Modified:
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=675991&r1=675990&r2=675991&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Jul 11 08:33:13 2008
@@ -59,6 +59,7 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@@ -2422,9 +2423,9 @@
indexIn.seek(reduce * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
//read the offset and length of the partition data
- long startOffset = indexIn.readLong();
- long rawPartLength = indexIn.readLong();
- long partLength = indexIn.readLong();
+ final long startOffset = indexIn.readLong();
+ final long rawPartLength = indexIn.readLong();
+ final long partLength = indexIn.readLong();
indexIn.close();
indexIn = null;
@@ -2447,6 +2448,23 @@
*/
//open the map-output file
mapOutputIn = fileSys.open(mapOutputFileName);
+
+ // TODO: Remove this after a 'fix' for HADOOP-3647
+ // The clever trick here to reduce the impact of the extra seek for
+ // logging the first key/value lengths is to read the lengths before
+ // the second seek for the actual shuffle. The second seek is almost
+ // a no-op since it is very short (go back length of two VInts) and
the
+ // data is almost guaranteed to be in the filesystem's buffers.
+ // WARN: This won't work for compressed map-outputs!
+ int firstKeyLength = 0;
+ int firstValueLength = 0;
+ if (partLength > 0) {
+ mapOutputIn.seek(startOffset);
+ firstKeyLength = WritableUtils.readVInt(mapOutputIn);
+ firstValueLength = WritableUtils.readVInt(mapOutputIn);
+ }
+
+
//seek to the correct offset for the reduce
mapOutputIn.seek(startOffset);
@@ -2472,7 +2490,8 @@
LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce +
" from map: " + mapId + " given " + partLength + "/" +
- rawPartLength);
+ rawPartLength + " from " + startOffset + " with (" +
+ firstKeyLength + ", " + firstValueLength + ")");
} catch (IOException ie) {
TaskTracker tracker =
(TaskTracker) context.getAttribute("task.tracker");