Author: acmurthy
Date: Thu Jun 5 14:10:19 2008
New Revision: 663738
URL: http://svn.apache.org/viewvc?rev=663738&view=rev
Log:
HADOOP-3326. Cleanup the local-fs and in-memory merge in the ReduceTask by
spawing only one thread each for the on-disk and in-memory merge. Contributed
by Sharad Agarwal.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663738&r1=663737&r2=663738&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun 5 14:10:19 2008
@@ -466,6 +466,10 @@
HADOOP-2565. Remove DFSPath cache of FileStatus.
(Tsz Wo (Nicholas), SZE via hairong)
+ HADOOP-3326. Cleanup the local-fs and in-memory merge in the ReduceTask by
+ spawing only one thread each for the on-disk and in-memory merge.
+ (Sharad Agarwal via acmurthy)
+
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=663738&r1=663737&r2=663738&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jun
5 14:10:19 2008
@@ -413,14 +413,14 @@
private volatile Throwable mergeThrowable;
/**
- * A flag to indicate that localFS merge is in progress
+ * A flag to indicate when to exit localFS merge
*/
- private volatile boolean localFSMergeInProgress = false;
+ private volatile boolean exitLocalFSMerge = false;
/**
- * A flag to indicate that merge is in progress
+ * A flag to indicate when to exit InMemMerge
*/
- private volatile boolean mergeInProgress = false;
+ private volatile boolean exitInMemMerge = false;
/**
* When we accumulate mergeThreshold number of files in ram, we merge/spill
@@ -873,27 +873,10 @@
if (mapOutput.inMemory) {
// Save it in the synchronized list of map-outputs
mapOutputsFilesInMemory.add(mapOutput);
-
- //Create a thread to do merges. Synchronize access/update to
- //mergeInProgress
- if (!mergeInProgress &&
- ((ramManager.getPercentUsed() >= MAX_INMEM_FILESYS_USE &&
- ramManager.getReservedFiles() >=
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION))
||
- (mergeThreshold > 0 &&
- ramManager.getReservedFiles() >= mergeThreshold)) &&
- mergeThrowable == null) {
- LOG.info(reduceId + " RamManager " +
- " is " + ramManager.getPercentUsed() + " full with " +
- mapOutputsFilesInMemory.size() + " files." +
- " Triggering merge");
-
- InMemFSMergeThread m =
- new InMemFSMergeThread((LocalFileSystem)localFileSys);
- m.setName("Thread for merging in-memory files");
- m.setDaemon(true);
- mergeInProgress = true;
- m.start();
+
+ //notify the InMemFSMergeThread
+ synchronized(ramManager){
+ ramManager.notify();
}
} else {
// Rename the temporary file to the final file;
@@ -908,7 +891,7 @@
}
synchronized (mapOutputFilesOnDisk) {
- mapOutputFilesOnDisk.add(localFileSys.getFileStatus(filename));
+ addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
}
}
@@ -1188,6 +1171,8 @@
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
final Progress copyPhase =
reduceTask.getProgress().phase();
+ LocalFSMerger localFSMergerThread = null;
+ InMemFSMergeThread inMemFSMergeThread = null;
for (int i = 0; i < numOutputs; i++) {
copyPhase.addPhase(); // add sub-phase per file
@@ -1204,6 +1189,13 @@
copier.start();
}
+ //start the on-disk-merge thread
+ localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
+ //start the in memory merger thread
+ inMemFSMergeThread = new InMemFSMergeThread();
+ localFSMergerThread.start();
+ inMemFSMergeThread.start();
+
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
@@ -1329,25 +1321,6 @@
}
}
- // Check if a on-disk merge can be done. This will help if there
- // are no copies to be fetched but sufficient copies to be merged.
- synchronized (mapOutputFilesOnDisk) {
- if (!localFSMergeInProgress
- && (mapOutputFilesOnDisk.size() >= (2 * ioSortFactor - 1))) {
- // make sure that only one thread merges the disk files
- localFSMergeInProgress = true;
- // start the on-disk-merge process
- LOG.info(reduceTask.getTaskID() + "We have " +
- mapOutputFilesOnDisk.size() + " map outputs on disk. " +
- "Triggering merge of " + ioSortFactor + " files");
- LocalFSMerger lfsm =
- new LocalFSMerger((LocalFileSystem)localFileSys);
- lfsm.setName("Thread for merging on-disk files");
- lfsm.setDaemon(true);
- lfsm.start();
- }
- }
-
// if we have no copies in flight and we can't schedule anything
// new, just wait for a bit
try {
@@ -1519,83 +1492,25 @@
}
}
+ // copiers are done, exit and notify the waiting merge threads
+ synchronized (mapOutputFilesOnDisk) {
+ exitLocalFSMerge = true;
+ mapOutputFilesOnDisk.notify();
+ }
+ synchronized (ramManager) {
+ exitInMemMerge = true;
+ ramManager.notify();
+ }
+
//Do a merge of in-memory files (if there are any)
if (mergeThrowable == null) {
try {
// Wait for the on-disk merge to complete
- while (localFSMergeInProgress) {
- Thread.sleep(200);
- }
+ localFSMergerThread.join();
//wait for an ongoing merge (if it is in flight) to complete
- while (mergeInProgress) {
- Thread.sleep(200);
- }
- LOG.info(reduceTask.getTaskID() +
- " Copying of all map outputs complete. " +
- "Initiating the last merge on the remaining files " +
- "in-memory");
- if (mergeThrowable != null) {
- //this could happen if the merge that
- //was in progress threw an exception
- throw mergeThrowable;
- }
- //initiate merge
- if (mapOutputsFilesInMemory.size() == 0) {
- LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
- "in-memory map-outputs");
- return (copiedMapOutputs.size() == numOutputs);
- }
- //name this output file same as the name of the first file that is
- //there in the current list of inmem files (this is guaranteed to
be
- //absent on the disk currently. So we don't overwrite a prev.
- //created spill). Also we need to create the output file now since
- //it is not guaranteed that this file will be present after merge
- //is called (we delete empty map-output files as soon as we see
them
- //in the merge method)
- TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
- Path outputPath =
- localFileSys.makeQualified(
- mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(),
- ramfsMergeOutputSize));
- Writer writer =
- new Writer(conf, localFileSys, outputPath,
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(),
- codec);
- List<Segment<K, V>> inMemorySegments = createInMemorySegments();
- int noInMemSegments = inMemorySegments.size();
- RawKeyValueIterator rIter = null;
- try {
- rIter = Merger.merge(conf, localFileSys,
- (Class<K>)conf.getMapOutputKeyClass(),
- (Class<V>)conf.getMapOutputValueClass(),
- inMemorySegments, inMemorySegments.size(),
- new
Path(reduceTask.getTaskID().toString()),
- conf.getOutputKeyComparator(), reporter);
-
- Merger.writeFile(rIter, writer, reporter);
- writer.close();
- } catch (Exception e) {
- //make sure that we delete the ondisk file that we created
earlier
- //when we invoked cloneFileAttributes
- writer.close();
- localFileSys.delete(outputPath, true);
- throw new IOException (StringUtils.stringifyException(e));
- }
- LOG.info(reduceTask.getTaskID() +
- " Merge of the " + noInMemSegments +
- " files in InMemoryFileSystem complete." +
- " Local file is " + outputPath +
- " of size " +
- localFileSys.getFileStatus(outputPath).getLen());
-
- FileStatus status = localFileSys.getFileStatus(outputPath);
- synchronized (mapOutputFilesOnDisk) {
- mapOutputFilesOnDisk.add(status);
- }
- } catch (Throwable t) {
+ inMemFSMergeThread.join();
+ } catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
" Final merge of the inmemory files threw an exception: "
+
StringUtils.stringifyException(t));
@@ -1642,6 +1557,13 @@
}
}
+ private void addToMapOutputFilesOnDisk(FileStatus status) {
+ synchronized (mapOutputFilesOnDisk) {
+ mapOutputFilesOnDisk.add(status);
+ mapOutputFilesOnDisk.notify();
+ }
+ }
+
/**
* Queries the [EMAIL PROTECTED] TaskTracker} for a set of map-completion
events from
* a given event ID.
@@ -1734,77 +1656,93 @@
public LocalFSMerger(LocalFileSystem fs) {
this.localFileSys = fs;
+ setName("Thread for merging on-disk files");
+ setDaemon(true);
}
@SuppressWarnings("unchecked")
public void run() {
try {
- List<Path> mapFiles = new ArrayList<Path>();
- long approxOutputSize = 0;
- int bytesPerSum =
- reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
- LOG.info(reduceTask.getTaskID()
- + " Merging map output files on disk");
- // 1. Prepare the list of files to be merged. This list is prepared
- // using a list of map output files on disk. Currently we merge
- // io.sort.factor files into 1.
- synchronized (mapOutputFilesOnDisk) {
- for (int i = 0; i < ioSortFactor; ++i) {
- FileStatus filestatus = mapOutputFilesOnDisk.first();
- mapOutputFilesOnDisk.remove(filestatus);
- mapFiles.add(filestatus.getPath());
- approxOutputSize += filestatus.getLen();
+ LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
+ while(!exitLocalFSMerge){
+ synchronized (mapOutputFilesOnDisk) {
+ while (!exitLocalFSMerge &&
+ mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
+ LOG.info(reduceTask.getTaskID() + " Thread waiting: " +
getName());
+ mapOutputFilesOnDisk.wait();
+ }
}
- }
-
- // sanity check
- if (mapFiles.size() == 0) {
- return;
- }
-
- // add the checksum length
- approxOutputSize += ChecksumFileSystem
- .getChecksumLength(approxOutputSize,
- bytesPerSum);
-
- // 2. Start the on-disk merge process
- Path outputPath =
- lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
- approxOutputSize, conf)
- .suffix(".merged");
- Writer writer =
- new Writer(conf, localFileSys, outputPath,
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(),
- codec);
- RawKeyValueIterator iter = null;
- Path tmpDir = new Path(reduceTask.getTaskID().toString());
- final Reporter reporter = getReporter(umbilical);
- try {
- iter = Merger.merge(conf, localFileSys,
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(),
- codec, mapFiles.toArray(new
Path[mapFiles.size()]),
- true, ioSortFactor, tmpDir,
- conf.getOutputKeyComparator(), reporter);
- } catch (Exception e) {
+ if(exitLocalFSMerge) {//to avoid running one extra time in the end
+ break;
+ }
+ List<Path> mapFiles = new ArrayList<Path>();
+ long approxOutputSize = 0;
+ int bytesPerSum =
+ reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
+ LOG.info(reduceTask.getTaskID() + "We have " +
+ mapOutputFilesOnDisk.size() + " map outputs on disk. " +
+ "Triggering merge of " + ioSortFactor + " files");
+ // 1. Prepare the list of files to be merged. This list is prepared
+ // using a list of map output files on disk. Currently we merge
+ // io.sort.factor files into 1.
+ synchronized (mapOutputFilesOnDisk) {
+ for (int i = 0; i < ioSortFactor; ++i) {
+ FileStatus filestatus = mapOutputFilesOnDisk.first();
+ mapOutputFilesOnDisk.remove(filestatus);
+ mapFiles.add(filestatus.getPath());
+ approxOutputSize += filestatus.getLen();
+ }
+ }
+
+ // sanity check
+ if (mapFiles.size() == 0) {
+ return;
+ }
+
+ // add the checksum length
+ approxOutputSize += ChecksumFileSystem
+ .getChecksumLength(approxOutputSize,
+ bytesPerSum);
+
+ // 2. Start the on-disk merge process
+ Path outputPath =
+ lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
+ approxOutputSize, conf)
+ .suffix(".merged");
+ Writer writer =
+ new Writer(conf, localFileSys, outputPath,
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(),
+ codec);
+ RawKeyValueIterator iter = null;
+ Path tmpDir = new Path(reduceTask.getTaskID().toString());
+ final Reporter reporter = getReporter(umbilical);
+ try {
+ iter = Merger.merge(conf, localFileSys,
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(),
+ codec, mapFiles.toArray(new
Path[mapFiles.size()]),
+ true, ioSortFactor, tmpDir,
+ conf.getOutputKeyComparator(), reporter);
+ } catch (Exception e) {
+ writer.close();
+ localFileSys.delete(outputPath, true);
+ throw new IOException (StringUtils.stringifyException(e));
+ }
+ Merger.writeFile(iter, writer, reporter);
writer.close();
- localFileSys.delete(outputPath, true);
- throw new IOException (StringUtils.stringifyException(e));
- }
- Merger.writeFile(iter, writer, reporter);
- writer.close();
-
- synchronized (mapOutputFilesOnDisk) {
- mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
- }
-
- LOG.info(reduceTask.getTaskID() +
- " Finished merging " + mapFiles.size() +
- " map output files on disk of total-size " +
- approxOutputSize + "." +
- " Local output file is " + outputPath + " of size " +
- localFileSys.getFileStatus(outputPath).getLen());
+
+ synchronized (mapOutputFilesOnDisk) {
+
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
+ }
+
+ LOG.info(reduceTask.getTaskID() +
+ " Finished merging " + mapFiles.size() +
+ " map output files on disk of total-size " +
+ approxOutputSize + "." +
+ " Local output file is " + outputPath + " of size " +
+ localFileSys.getFileStatus(outputPath).getLen());
+ }
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID()
+ " Merging of the local FS files threw an exception: "
@@ -1812,94 +1750,126 @@
if (mergeThrowable == null) {
mergeThrowable = t;
}
- } finally {
- localFSMergeInProgress = false;
- }
+ }
}
}
private class InMemFSMergeThread extends Thread {
- private LocalFileSystem localFileSys;
- public InMemFSMergeThread( LocalFileSystem localFileSys) {
- this.localFileSys = localFileSys;
+ public InMemFSMergeThread() {
+ setName("Thread for merging in memory files");
+ setDaemon(true);
}
@SuppressWarnings("unchecked")
public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
try {
- if (mapOutputsFilesInMemory.size() >=
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
- //name this output file same as the name of the first file that is
- //there in the current list of inmem files (this is guaranteed to
- //be absent on the disk currently. So we don't overwrite a prev.
- //created spill). Also we need to create the output file now since
- //it is not guaranteed that this file will be present after merge
- //is called (we delete empty files as soon as we see them
- //in the merge method)
-
- //figure out the mapId
- TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
-
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), ramfsMergeOutputSize);
-
- Writer writer =
- new Writer(conf, localFileSys, outputPath,
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(),
- codec);
-
- List<Segment<K, V>> inMemorySegments = createInMemorySegments();
- int noInMemorySegments = inMemorySegments.size();
-
- RawKeyValueIterator rIter = null;
- final Reporter reporter = getReporter(umbilical);
- try {
- rIter = Merger.merge(conf, localFileSys,
- (Class<K>)conf.getMapOutputKeyClass(),
- (Class<V>)conf.getMapOutputValueClass(),
- inMemorySegments, inMemorySegments.size(),
- new Path(reduceTask.getTaskID().toString()),
- conf.getOutputKeyComparator(), reporter);
- if (null == combinerClass) {
- Merger.writeFile(rIter, writer, reporter);
- } else {
- combineCollector.setWriter(writer);
- combineAndSpill(rIter, reduceCombineInputCounter);
+ while(!exitInMemMerge) {
+ synchronized(ramManager) {
+ while(!exitInMemMerge &&
+ ((ramManager.getPercentUsed() < MAX_INMEM_FILESYS_USE ||
+ ramManager.getReservedFiles() <
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION))
+ &&
+ (mergeThreshold <= 0 ||
+ ramManager.getReservedFiles() < mergeThreshold))) {
+ LOG.info(reduceTask.getTaskID() + " Thread waiting: " +
getName());
+ ramManager.wait();
}
- } catch (Exception e) {
- //make sure that we delete the ondisk file that we created
- //earlier when we invoked cloneFileAttributes
- writer.close();
- localFileSys.delete(outputPath, true);
- throw (IOException)new IOException
- ("Intermedate merge failed").initCause(e);
}
- writer.close();
- LOG.info(reduceTask.getTaskID() +
- " Merge of the " + noInMemorySegments +
- " files in-memory complete." +
- " Local file is " + outputPath + " of size " +
- localFileSys.getFileStatus(outputPath).getLen());
-
- FileStatus status = localFileSys.getFileStatus(outputPath);
- synchronized (mapOutputFilesOnDisk) {
- mapOutputFilesOnDisk.add(status);
+ if(exitInMemMerge) {//to avoid running one extra time in the end
+ break;
}
+ LOG.info(reduceTask.getTaskID() + " RamManager " +
+ " is " + ramManager.getPercentUsed() + " full with " +
+ mapOutputsFilesInMemory.size() + " files." +
+ " Triggering merge");
+ if (mapOutputsFilesInMemory.size() >=
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+ doInMemMerge();
+ }
+ else {
+ LOG.info(reduceTask.getTaskID() + " Nothing to merge from
map-outputs in-memory");
+ }
+ }
+ //see if any remaining files are there for merge
+ LOG.info(reduceTask.getTaskID() +
+ " Copying of all map outputs complete. " +
+ "Initiating the last merge on the remaining files " +
+ "in-memory");
+ if (mapOutputsFilesInMemory.size() == 0) {
+ LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
+ "in-memory map-outputs");
+ return;
}
- else {
- LOG.info(reduceTask.getTaskID() + " Nothing to merge from
map-outputs in-memory");
- }
+ doInMemMerge();
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
- " Intermediate Merge of the inmemory files threw an
exception: "
+ " Merge of the inmemory files threw an exception: "
+ StringUtils.stringifyException(t));
ReduceCopier.this.mergeThrowable = t;
}
- finally {
- mergeInProgress = false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doInMemMerge() throws IOException{
+ //name this output file same as the name of the first file that is
+ //there in the current list of inmem files (this is guaranteed to
+ //be absent on the disk currently. So we don't overwrite a prev.
+ //created spill). Also we need to create the output file now since
+ //it is not guaranteed that this file will be present after merge
+ //is called (we delete empty files as soon as we see them
+ //in the merge method)
+
+ //figure out the mapId
+ TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
+
+ Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
+ reduceTask.getTaskID(), ramfsMergeOutputSize);
+
+ Writer writer =
+ new Writer(conf, localFileSys, outputPath,
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(),
+ codec);
+
+ List<Segment<K, V>> inMemorySegments = createInMemorySegments();
+ int noInMemorySegments = inMemorySegments.size();
+
+ RawKeyValueIterator rIter = null;
+ final Reporter reporter = getReporter(umbilical);
+ try {
+ rIter = Merger.merge(conf, localFileSys,
+ (Class<K>)conf.getMapOutputKeyClass(),
+ (Class<V>)conf.getMapOutputValueClass(),
+ inMemorySegments, inMemorySegments.size(),
+ new Path(reduceTask.getTaskID().toString()),
+ conf.getOutputKeyComparator(), reporter);
+ if (null == combinerClass) {
+ Merger.writeFile(rIter, writer, reporter);
+ } else {
+ combineCollector.setWriter(writer);
+ combineAndSpill(rIter, reduceCombineInputCounter);
+ }
+ } catch (Exception e) {
+ //make sure that we delete the ondisk file that we created
+ //earlier when we invoked cloneFileAttributes
+ writer.close();
+ localFileSys.delete(outputPath, true);
+ throw (IOException)new IOException
+ ("Intermedate merge failed").initCause(e);
+ }
+ writer.close();
+ LOG.info(reduceTask.getTaskID() +
+ " Merge of the " + noInMemorySegments +
+ " files in-memory complete." +
+ " Local file is " + outputPath + " of size " +
+ localFileSys.getFileStatus(outputPath).getLen());
+
+ FileStatus status = localFileSys.getFileStatus(outputPath);
+ synchronized (mapOutputFilesOnDisk) {
+ addToMapOutputFilesOnDisk(status);
}
}
}