Author: acmurthy
Date: Fri Jun 6 17:08:54 2008
New Revision: 664208
URL: http://svn.apache.org/viewvc?rev=664208&view=rev
Log:
HADOOP-3366. Stall the shuffle while in-memory merge is in progress.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=664208&r1=664207&r2=664208&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 6 17:08:54 2008
@@ -267,6 +267,9 @@
HADOOP-3501. Deprecate InMemoryFileSystem. (cutting via omalley)
+ HADOOP-3366. Stall the shuffle while in-memory merge is in progress.
+ (acmurthy)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?rev=664208&r1=664207&r2=664208&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Fri
Jun 6 17:08:54 2008
@@ -35,7 +35,7 @@
/**
* Constant denoting when a merge of in memory files will be triggered
*/
- public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+ public static final float MAX_INMEM_FILESYS_USE = 0.66f;
/**
* Constant denoting the max size (in terms of the fraction of the total
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java?rev=664208&r1=664207&r2=664208&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java Fri Jun
6 17:08:54 2008
@@ -17,49 +17,26 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.hadoop.conf.Configuration;
+import java.io.InputStream;
-class RamManager {
- volatile private int numReserved = 0;
- volatile private int size = 0;
- private final int maxSize;
-
- public RamManager(Configuration conf) {
- maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
- }
-
- synchronized boolean reserve(long requestedSize) {
- if (requestedSize > Integer.MAX_VALUE ||
- (size + requestedSize) > Integer.MAX_VALUE) {
- return false;
- }
-
- if ((size + requestedSize) < maxSize) {
- size += requestedSize;
- ++numReserved;
- return true;
- }
- return false;
- }
-
- synchronized void unreserve(int requestedSize) {
- size -= requestedSize;
- --numReserved;
- }
-
- int getUsedMemory() {
- return size;
- }
-
- float getPercentUsed() {
- return (float)size/maxSize;
- }
-
- int getReservedFiles() {
- return numReserved;
- }
-
- int getMemoryLimit() {
- return maxSize;
- }
+/**
+ * <code>RamManager</code> manages a memory pool of a configured limit.
+ */
+interface RamManager {
+ /**
+ * Reserve memory for data coming through the given input-stream.
+ *
+ * @param requestedSize size of memory requested
+ * @param in input stream
+ * @return <code>true</code> if memory was allocated immediately,
+ * else <code>false</code>
+ */
+ boolean reserve(int requestedSize, InputStream in);
+
+ /**
+ * Return memory to the pool.
+ *
+ * @param requestedSize size of memory returned to the pool
+ */
+ void unreserve(int requestedSize);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=664208&r1=664207&r2=664208&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun
6 17:08:54 2008
@@ -400,7 +400,7 @@
* A reference to the RamManager for writing the map outputs to.
*/
- private RamManager ramManager;
+ private ShuffleRamManager ramManager;
/**
* A reference to the local file system for writing the map outputs to.
@@ -709,6 +709,114 @@
}
}
+ class ShuffleRamManager implements RamManager {
+ /* Maximum percentage of the in-memory limit that a single shuffle can
+ * consume*/
+ private static final float MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT = 0.25f;
+
+ private boolean closed = false;
+
+ volatile private int numClosed = 0;
+ volatile private int size = 0;
+ private final int maxSize;
+ private final int maxSingleShuffleLimit;
+
+ private Object dataAvailable = new Object();
+ private volatile int fullSize = 0;
+
+ public ShuffleRamManager(Configuration conf) {
+ maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
+ maxSingleShuffleLimit = (int)(maxSize *
MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT);
+ LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize +
+ ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
+ }
+
+ public synchronized boolean reserve(int requestedSize, InputStream in) {
+ while ((size + requestedSize) > maxSize) {
+ try {
+ // Close the connection
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException ie) {
+ LOG.info("Failed to close connection with: " + ie);
+ } finally {
+ in = null;
+ }
+ }
+
+ // Wait for memory to free up
+ wait();
+ } catch (InterruptedException ie) {}
+ }
+
+ size += requestedSize;
+
+ return (in != null);
+ }
+
+ public synchronized void unreserve(int requestedSize) {
+ size -= requestedSize;
+
+ synchronized (dataAvailable) {
+ fullSize -= requestedSize;
+ --numClosed;
+ }
+
+ // Notify the threads blocked on RamManager.reserve
+ notifyAll();
+ }
+
+ public void waitForDataToMerge() {
+ synchronized (dataAvailable) {
+ while (!closed &&
+ (getPercentUsed() < MAX_INMEM_FILESYS_USE ||
+ getReservedFiles() <
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
+ )
+ &&
+ (mergeThreshold <= 0 || getReservedFiles() < mergeThreshold))
{
+ try {
+ dataAvailable.wait();
+ } catch (InterruptedException ie) {}
+ }
+ }
+ }
+
+ public void closeInMemoryFile(int requestedSize) {
+ synchronized (dataAvailable) {
+ fullSize += requestedSize;
+ ++numClosed;
+ dataAvailable.notify();
+ }
+ }
+
+ public void close() {
+ synchronized (dataAvailable) {
+ closed = true;
+ LOG.info("Closed ram manager");
+ dataAvailable.notify();
+ }
+ }
+
+ float getPercentUsed() {
+ return (float)fullSize/maxSize;
+ }
+
+ int getReservedFiles() {
+ return numClosed;
+ }
+
+ int getMemoryLimit() {
+ return maxSize;
+ }
+
+ boolean canFitInMemory(long requestedSize) {
+ return (requestedSize < Integer.MAX_VALUE &&
+ requestedSize < maxSingleShuffleLimit);
+ }
+ }
+
/** Copies map outputs as they become available */
private class MapOutputCopier extends Thread {
// basic/unit connection timeout (in milliseconds)
@@ -878,11 +986,6 @@
if (mapOutput.inMemory) {
// Save it in the synchronized list of map-outputs
mapOutputsFilesInMemory.add(mapOutput);
-
- //notify the InMemFSMergeThread
- synchronized(ramManager){
- ramManager.notify();
- }
} else {
// Rename the temporary file to the final file;
// ensure it is on the same partition
@@ -941,13 +1044,39 @@
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
- // Check if we can save the map-output in-memory
- boolean createInMem = ramManager.reserve(decompressedLength);
- if (createInMem) {
- LOG.info("Shuffling " + decompressedLength + " bytes (" +
+ // Check if this map-output can be saved in-memory
+ boolean canFitInMemory =
+ ramManager.canFitInMemory(decompressedLength);
+
+ if (canFitInMemory) {
+ int requestedSize = (int)decompressedLength;
+ // Check if we have enough buffer-space to keep map-output
in-memory
+ boolean createdNow =
+ ramManager.reserve(requestedSize, input);
+
+ LOG.info("Shuffling " + requestedSize + " bytes (" +
compressedLength + " raw bytes) " +
"into RAM-FS from " + mapOutputLoc.getTaskAttemptId());
+ if (!createdNow) {
+ // Reconnect
+ try {
+ connection = mapOutputLoc.getOutputLocation().openConnection();
+ input = getInputStream(connection, DEFAULT_READ_TIMEOUT,
+ STALLED_COPY_TIMEOUT);
+ } catch (Throwable t) {
+ // Cleanup
+ ramManager.closeInMemoryFile(requestedSize);
+ ramManager.unreserve(requestedSize);
+
+ IOException ioe = new IOException("Failed to re-open " +
+ "connection to "
+
+
mapOutputLoc.getHost());
+ ioe.initCause(t);
+ throw ioe;
+ }
+ }
+
// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
@@ -985,14 +1114,17 @@
LOG.info("Read " + bytesRead + " bytes from map-output " +
"for " + mapOutputLoc.getTaskAttemptId());
+ if (canFitInMemory) {
+ byte[] shuffleData = ((DataOutputBuffer)output).getData();
+ mapOutput = new MapOutput(mapOutputLoc.getTaskId(),
+
((DataOutputBuffer)output).getData());
+ ramManager.closeInMemoryFile(shuffleData.length);
+ } else {
mapOutput =
- (createInMem) ?
- new MapOutput(mapOutputLoc.getTaskId(),
- ((DataOutputBuffer)output).getData()) :
- new MapOutput(mapOutputLoc.getTaskId(), conf,
- localFileSys.makeQualified(localFilename),
- compressedLength);
-
+ new MapOutput(mapOutputLoc.getTaskId(), conf,
+ localFileSys.makeQualified(localFilename),
+ compressedLength);
+ }
} finally {
output.close();
}
@@ -1001,8 +1133,8 @@
}
// Sanity check
- good = createInMem ? (bytesRead == decompressedLength) :
- (bytesRead == compressedLength);
+ good = (canFitInMemory) ? (bytesRead == decompressedLength) :
+ (bytesRead == compressedLength);
if (!good) {
throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
@@ -1142,7 +1274,7 @@
this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
// Setup the RamManager
- ramManager = new RamManager(conf);
+ ramManager = new ShuffleRamManager(conf);
ramfsMergeOutputSize =
(long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
@@ -1507,10 +1639,9 @@
exitLocalFSMerge = true;
mapOutputFilesOnDisk.notify();
}
- synchronized (ramManager) {
- exitInMemMerge = true;
- ramManager.notify();
- }
+
+ exitInMemMerge = true;
+ ramManager.close();
//Do a merge of in-memory files (if there are any)
if (mergeThrowable == null) {
@@ -1736,13 +1867,13 @@
codec, mapFiles.toArray(new
Path[mapFiles.size()]),
true, ioSortFactor, tmpDir,
conf.getOutputKeyComparator(), reporter);
- } catch (Exception e) {
+
+ Merger.writeFile(iter, writer, reporter);
writer.close();
+ } catch (Exception e) {
localFileSys.delete(outputPath, true);
throw new IOException (StringUtils.stringifyException(e));
}
- Merger.writeFile(iter, writer, reporter);
- writer.close();
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
@@ -1767,7 +1898,7 @@
}
private class InMemFSMergeThread extends Thread {
-
+
public InMemFSMergeThread() {
setName("Thread for merging in memory files");
setDaemon(true);
@@ -1777,45 +1908,10 @@
public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
try {
- while(!exitInMemMerge) {
- synchronized(ramManager) {
- while(!exitInMemMerge &&
- ((ramManager.getPercentUsed() < MAX_INMEM_FILESYS_USE ||
- ramManager.getReservedFiles() <
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION))
- &&
- (mergeThreshold <= 0 ||
- ramManager.getReservedFiles() < mergeThreshold))) {
- LOG.info(reduceTask.getTaskID() + " Thread waiting: " +
getName());
- ramManager.wait();
- }
- }
- if(exitInMemMerge) {//to avoid running one extra time in the end
- break;
- }
- LOG.info(reduceTask.getTaskID() + " RamManager " +
- " is " + ramManager.getPercentUsed() + " full with " +
- mapOutputsFilesInMemory.size() + " files." +
- " Triggering merge");
- if (mapOutputsFilesInMemory.size() >=
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
- doInMemMerge();
- }
- else {
- LOG.info(reduceTask.getTaskID() + " Nothing to merge from
map-outputs in-memory");
- }
- }
- //see if any remaining files are there for merge
- LOG.info(reduceTask.getTaskID() +
- " Copying of all map outputs complete. " +
- "Initiating the last merge on the remaining files " +
- "in-memory");
- if (mapOutputsFilesInMemory.size() == 0) {
- LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
- "in-memory map-outputs");
- return;
+ while (!exitInMemMerge) {
+ ramManager.waitForDataToMerge();
+ doInMemMerge();
}
- doInMemMerge();
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" Merge of the inmemory files threw an exception: "
@@ -1826,6 +1922,11 @@
@SuppressWarnings("unchecked")
private void doInMemMerge() throws IOException{
+ if (mapOutputsFilesInMemory.size() == 0) {
+ LOG.info("Noting to merge... ");
+ return;
+ }
+
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
@@ -1864,15 +1965,14 @@
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
+ writer.close();
} catch (Exception e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
- writer.close();
localFileSys.delete(outputPath, true);
throw (IOException)new IOException
("Intermedate merge failed").initCause(e);
}
- writer.close();
LOG.info(reduceTask.getTaskID() +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +