Author: cutting Date: Mon May 7 14:25:07 2007 New Revision: 535997 URL: http://svn.apache.org/viewvc?view=rev&rev=535997 Log: HADOOP-1252. Changed MapReduce's allocation of local files to use round-robin among configured devices, rather than a hashcode, also improving error handling. Contributed by Devaraj.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon May 7 14:25:07 2007 @@ -351,6 +351,11 @@ 104. HADOOP-1200. Restore disk checking lost in HADOOP-1170. (Hairong Kuang via cutting) +105. HADOOP-1252. Changed MapReduce's allocation of local files to + use round-robin among available devices, rather than a hashcode. + More care is also taken to not allocate files on full or offline + drives. (Devaraj Das via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Mon May 7 14:25:07 2007 @@ -40,6 +40,10 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; + public static double getApproxChkSumLength(long size) { + return FSOutputSummer.CHKSUM_AS_FRACTION * size; + } + public ChecksumFileSystem(FileSystem fs) { super(fs); } @@ -343,6 +347,7 @@ private Checksum sum = new CRC32(); private int inSum; private int bytesPerSum; + private static final float CHKSUM_AS_FRACTION = 0.01f; public FSOutputSummer(ChecksumFileSystem fs, Path file, Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java?view=auto&rev=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalDirAllocator.java Mon May 7 14:25:07 2007 @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.*; +import java.util.*; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.util.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.conf.Configuration; + +/** An implementation of a round-robin scheme for disk allocation for creating + * files. The way it works is that it is kept track what disk was last + * allocated for a file write. For the current request, the next disk from + * the set of disks would be allocated if the free space on the disk is + * sufficient enough to accomodate the file that is being considered for + * creation. If the space requirements cannot be met, the next disk in order + * would be tried and so on till a disk is found with sufficient capacity. + * Once a disk with sufficient space is identified, a check is done to make + * sure that the disk is writable. Also, there is an API provided that doesn't + * take the space requirements into consideration but just checks whether the + * disk under consideration is writable (this should be used for cases where + * the file size is not known apriori). An API is provided to read a path that + * was created earlier. That API works by doing a scan of all the disks for the + * input pathname. + * This implementation also provides the functionality of having multiple + * allocators per JVM (one for each unique functionality or context, like + * mapred, dfs-client, etc.). It ensures that there is only one instance of + * an allocator per context per JVM. + * Note: + * 1. The contexts referred above are actually the configuration items defined + * in the Configuration class like "mapred.local.dir" (for which we want to + * control the dir allocations). The context-strings are exactly those + * configuration items. + * 2. This implementation does not take into consideration cases where + * a disk becomes read-only or goes out of space while a file is being written + * to (disks are shared between multiple processes, and so the latter situation + * is probable). + * 3. In the class implementation, "Disk" is referred to as "Dir", which + * actually points to the configured directory on the Disk which will be the + * parent for all file write/read allocations. + */ +public class LocalDirAllocator { + + //A Map from the config item names like "mapred.local.dir", + //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This + //is a static object to make sure there exists exactly one instance per JVM + private static Map <String, AllocatorPerContext> contexts = + new TreeMap<String, AllocatorPerContext>(); + private String contextCfgItemName; + + /**Create an allocator object + * @param contextCfgItemName + */ + public LocalDirAllocator(String contextCfgItemName) { + this.contextCfgItemName = contextCfgItemName; + } + + /** This method must be used to obtain the dir allocation context for a + * particular value of the context name. The context name must be an item + * defined in the Configuration object for which we want to control the + * dir allocations (e.g., <code>mapred.local.dir</code>). The method will + * create a context for that name if it doesn't already exist. + */ + private AllocatorPerContext obtainContext(String contextCfgItemName) { + synchronized (contexts) { + AllocatorPerContext l = contexts.get(contextCfgItemName); + if (l == null) { + contexts.put(contextCfgItemName, + (l = new AllocatorPerContext(contextCfgItemName))); + } + return l; + } + } + + /** Get a path from the local FS. This method should be used if the size of + * the file is not known apriori. We go round-robin over the set of disks + * (via the configured dirs) and return the first complete path where + * we could create the parent directory of the passed path. + * @param pathStr the requested path (this will be created on the first + * available disk) + * @param conf the Configuration object + * @return the complete path to the file on a local disk + * @throws IOException + */ + public Path getLocalPathForWrite(String pathStr, + Configuration conf) throws IOException { + return getLocalPathForWrite(pathStr, -1, conf); + } + + /** Get a path from the local FS. Pass size as -1 if not known apriori. We + * round-robin over the set of disks (via the configured dirs) and return + * the first complete path which has enough space + * @param pathStr the requested path (this will be created on the first + * available disk) + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @return the complete path to the file on a local disk + * @throws IOException + */ + public Path getLocalPathForWrite(String pathStr, long size, + Configuration conf) throws IOException { + AllocatorPerContext context = obtainContext(contextCfgItemName); + return context.getLocalPathForWrite(pathStr, size, conf); + } + + /** Get a path from the local FS for reading. We search through all the + * configured dirs for the file's existence and return the complete + * path to the file when we find one + * @param pathStr the requested file (this will be searched) + * @param conf the Configuration object + * @return the complete path to the file on a local disk + * @throws IOException + */ + public Path getLocalPathToRead(String pathStr, + Configuration conf) throws IOException { + AllocatorPerContext context = obtainContext(contextCfgItemName); + return context.getLocalPathToRead(pathStr, conf); + } + + /** Method to check whether a context is valid + * @param contextCfgItemName + * @return true/false + */ + public static boolean isContextValid(String contextCfgItemName) { + synchronized (contexts) { + return contexts.containsKey(contextCfgItemName); + } + } + + private class AllocatorPerContext { + + private final Log LOG = + LogFactory.getLog("org.apache.hadoop.fs.AllocatorPerContext"); + + private int dirNumLastAccessed; + private FileSystem localFS; + private DF[] dirDF; + private String contextCfgItemName; + private String[] localDirs; + private String savedLocalDirs = ""; + + public AllocatorPerContext(String contextCfgItemName) { + this.contextCfgItemName = contextCfgItemName; + } + + /** This method gets called everytime before any read/write to make sure + * that any change to localDirs is reflected immediately. + */ + private void confChanged(Configuration conf) throws IOException { + String newLocalDirs = conf.get(contextCfgItemName); + if (!newLocalDirs.equals(savedLocalDirs)) { + localDirs = conf.getStrings(contextCfgItemName); + localFS = FileSystem.getLocal(conf); + int numDirs = localDirs.length; + dirDF = new DF[numDirs]; + for (int i = 0; i < numDirs; i++) { + try { + localFS.mkdirs(new Path(localDirs[i])); + } catch (IOException ie) { } //ignore + dirDF[i] = new DF(new File(localDirs[i]), 30000); + } + dirNumLastAccessed = 0; + savedLocalDirs = newLocalDirs; + } + } + + private Path createPath(String path) throws IOException { + Path file = new Path(new Path(localDirs[dirNumLastAccessed]), + path); + //check whether we are able to create a directory here. If the disk + //happens to be RDONLY we will fail + try { + DiskChecker.checkDir(new File(file.getParent().toUri().getPath())); + return file; + } catch (DiskErrorException d) { + LOG.warn(StringUtils.stringifyException(d)); + return null; + } + } + + /** Get a path from the local FS. This method should be used if the size of + * the file is not known apriori. We go round-robin over the set of disks + * (via the configured dirs) and return the first complete path where + * we could create the parent directory of the passed path. + */ + public synchronized Path getLocalPathForWrite(String path, + Configuration conf) throws IOException { + return getLocalPathForWrite(path, -1, conf); + } + + /** Get a path from the local FS. Pass size as -1 if not known apriori. We + * round-robin over the set of disks (via the configured dirs) and return + * the first complete path which has enough space + */ + public synchronized Path getLocalPathForWrite(String pathStr, long size, + Configuration conf) throws IOException { + confChanged(conf); + int numDirs = localDirs.length; + int numDirsSearched = 0; + //remove the leading slash from the path (to make sure that the uri + //resolution results in a valid path on the dir being checked) + if (pathStr.startsWith("/")) { + pathStr = pathStr.substring(1); + } + Path returnPath = null; + while (numDirsSearched < numDirs && returnPath == null) { + if (size >= 0) { + long capacity = dirDF[dirNumLastAccessed].getAvailable(); + if (capacity > size) { + returnPath = createPath(pathStr); + } + } else { + returnPath = createPath(pathStr); + } + dirNumLastAccessed++; + dirNumLastAccessed = dirNumLastAccessed % numDirs; + numDirsSearched++; + } + + if (returnPath != null) { + return returnPath; + } + + //no path found + throw new DiskErrorException("Could not find any valid local " + + "directory for " + pathStr); + } + + /** Get a path from the local FS for reading. We search through all the + * configured dirs for the file's existence and return the complete + * path to the file when we find one + */ + public synchronized Path getLocalPathToRead(String pathStr, + Configuration conf) throws IOException { + confChanged(conf); + int numDirs = localDirs.length; + int numDirsSearched = 0; + //remove the leading slash from the path (to make sure that the uri + //resolution results in a valid path on the dir being checked) + if (pathStr.startsWith("/")) { + pathStr = pathStr.substring(1); + } + while (numDirsSearched < numDirs) { + Path file = new Path(localDirs[numDirsSearched], pathStr); + if (localFS.exists(file)) { + return file; + } + numDirsSearched++; + } + + //no path found + throw new DiskErrorException ("Could not find " + pathStr +" in any of" + + " the configured local directories"); + } + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon May 7 14:25:07 2007 @@ -2383,6 +2383,7 @@ int numSegments = sortedSegmentSizes.size(); int origFactor = factor; int passNo = 1; + LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); do { //get the factor for this pass of merge factor = getPassFactor(passNo, numSegments); @@ -2435,11 +2436,19 @@ return this; } else { //we want to spread the creation of temp files on multiple disks if - //available + //available under the space constraints + long approxOutputSize = 0; + for (SegmentDescriptor s : segmentsToMerge) { + approxOutputSize += s.segmentLength + + ChecksumFileSystem.getApproxChkSumLength( + s.segmentLength); + } Path tmpFilename = new Path(tmpDir, "intermediate").suffix("." + passNo); - Path outputFile = conf.getLocalPath("mapred.local.dir", - tmpFilename.toString()); + + Path outputFile = lDirAlloc.getLocalPathForWrite( + tmpFilename.toString(), + approxOutputSize, conf); LOG.info("writing intermediate results to " + outputFile); Writer writer = cloneFileAttributes( fs.makeQualified(segmentsToMerge.get(0).segmentPathName), Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon May 7 14:25:07 2007 @@ -140,7 +140,8 @@ for (int i = 0; i < mapIds.size(); i++) { String mapId = mapIds.get(i); Path mapOut = this.mapoutputFile.getOutputFile(mapId); - Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId); + Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId, + localFs.getLength(mapOut)); if (!localFs.mkdirs(reduceIn.getParent())) { throw new IOException("Mkdirs failed to create " + reduceIn.getParent().toString()); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon May 7 14:25:07 2007 @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.conf.*; /** @@ -29,49 +30,108 @@ class MapOutputFile { private JobConf conf; + private LocalDirAllocator lDirAlloc = + new LocalDirAllocator("mapred.local.dir"); - /** Create a local map output file name. + /** Return the path to local map output file created earlier * @param mapTaskId a map task id */ public Path getOutputFile(String mapTaskId) throws IOException { - return conf.getLocalPath(mapTaskId+"/file.out"); + return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf); } - /** Create a local map output index file name. + /** Create a local map output file name. + * @param mapTaskId a map task id + * @param size the size of the file + */ + public Path getOutputFileForWrite(String mapTaskId, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf); + } + + /** Return the path to a local map output index file created earlier * @param mapTaskId a map task id */ public Path getOutputIndexFile(String mapTaskId) throws IOException { - return conf.getLocalPath(mapTaskId+"/file.out.index"); + return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf); } - /** Create a local map spill file name. + /** Create a local map output index file name. + * @param mapTaskId a map task id + * @param size the size of the file + */ + public Path getOutputIndexFileForWrite(String mapTaskId, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index", + size, conf); + } + + /** Return a local map spill file created earlier. * @param mapTaskId a map task id * @param spillNumber the number */ public Path getSpillFile(String mapTaskId, int spillNumber) throws IOException { - return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out"); + return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out", + conf); } - /** Create a local map spill index file name. + /** Create a local map spill file name. + * @param mapTaskId a map task id + * @param spillNumber the number + * @param size the size of the file + */ + public Path getSpillFileForWrite(String mapTaskId, int spillNumber, + long size) throws IOException { + return lDirAlloc.getLocalPathForWrite(mapTaskId+ + "/spill" +spillNumber+".out", + size, conf); + } + + /** Return a local map spill index file created earlier * @param mapTaskId a map task id * @param spillNumber the number */ public Path getSpillIndexFile(String mapTaskId, int spillNumber) throws IOException { - return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out.index"); + return lDirAlloc.getLocalPathToRead( + mapTaskId+"/spill" +spillNumber+".out.index", conf); } - /** Create a local reduce input file name. + /** Create a local map spill index file name. + * @param mapTaskId a map task id + * @param spillNumber the number + * @param size the size of the file + */ + public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber, + long size) throws IOException { + return lDirAlloc.getLocalPathForWrite( + mapTaskId+"/spill" +spillNumber+".out.index", size, conf); + } + + /** Return a local reduce input file created earlier * @param mapTaskId a map task id * @param reduceTaskId a reduce task id */ public Path getInputFile(int mapId, String reduceTaskId) throws IOException { // TODO *oom* should use a format here - return conf.getLocalPath(reduceTaskId+"/map_"+mapId+".out"); + return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out", + conf); + } + + /** Create a local reduce input file name. + * @param mapTaskId a map task id + * @param reduceTaskId a reduce task id + * @param size the size of the file + */ + public Path getInputFileForWrite(int mapId, String reduceTaskId, long size) + throws IOException { + // TODO *oom* should use a format here + return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out", + size, conf); } /** Removes all of the files related to a task. */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Mon May 7 14:25:07 2007 @@ -26,9 +26,11 @@ import org.apache.hadoop.fs.InMemoryFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.*; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.conf.*; /** The location of a map output file, as passed to a reduce task via the * [EMAIL PROTECTED] InterTrackerProtocol}. */ @@ -174,7 +176,10 @@ * We use the file system so that we generate checksum files on the data. * @param inMemFileSys the inmemory filesystem to write the file to * @param localFileSys the local filesystem to write the file to + * @param shuffleMetrics the metrics context * @param localFilename the filename to write the data into + * @param lDirAlloc the LocalDirAllocator object + * @param conf the Configuration object * @param reduce the reduce id to get for * @param timeout number of ms for connection and read timeout * @return the path of the file that got created @@ -184,7 +189,8 @@ FileSystem localFileSys, MetricsRecord shuffleMetrics, Path localFilename, - int reduce, + LocalDirAllocator lDirAlloc, + Configuration conf, int reduce, int timeout) throws IOException, InterruptedException { boolean good = false; long totalBytes = 0; @@ -216,6 +222,11 @@ inMemFileSys.reserveSpaceWithCheckSum(localFilename, length)); if (createInMem) { fileSys = inMemFileSys; + } + else { + //now hit the localFS to find out a suitable location for the output + localFilename = lDirAlloc.getLocalPathForWrite( + localFilename.toUri().getPath(), length + checksumLength, conf); } output = fileSys.create(localFilename); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon May 7 14:25:07 2007 @@ -58,6 +58,7 @@ private BytesWritable split = new BytesWritable(); private String splitClass; private InputSplit instantiatedSplit = null; + private final static int APPROX_HEADER_LENGTH = 150; private static final Log LOG = LogFactory.getLog(MapTask.class.getName()); @@ -391,11 +392,16 @@ //sort, combine and spill to disk private void sortAndSpillToDisk() throws IOException { synchronized (this) { - Path filename = mapOutputFile.getSpillFile(getTaskId(), numSpills); + //approximate the length of the output file to be the length of the + //buffer + header lengths for the partitions + long size = keyValBuffer.getLength() + + partitions * APPROX_HEADER_LENGTH; + Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), + numSpills, size); //we just create the FSDataOutputStream object here. out = localFs.create(filename); - Path indexFilename = mapOutputFile.getSpillIndexFile(getTaskId(), - numSpills); + Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( + getTaskId(), numSpills, partitions * 16); indexOut = localFs.create(indexFilename); LOG.debug("opened "+ mapOutputFile.getSpillFile(getTaskId(), numSpills).getName()); @@ -479,14 +485,31 @@ } private void mergeParts() throws IOException { - Path finalOutputFile = mapOutputFile.getOutputFile(getTaskId()); - Path finalIndexFile = mapOutputFile.getOutputIndexFile(getTaskId()); + // get the approximate size of the final output/index files + long finalOutFileSize = 0; + long finalIndexFileSize = 0; + Path [] filename = new Path[numSpills]; + Path [] indexFileName = new Path[numSpills]; + + for(int i = 0; i < numSpills; i++) { + filename[i] = mapOutputFile.getSpillFile(getTaskId(), i); + indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i); + finalOutFileSize += localFs.getLength(filename[i]); + } + //make correction in the length to include the sequence file header + //lengths for each partition + finalOutFileSize += partitions * APPROX_HEADER_LENGTH; + + finalIndexFileSize = partitions * 16; + + Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskId(), + finalOutFileSize); + Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite( + getTaskId(), finalIndexFileSize); if (numSpills == 1) { //the spill is the final output - Path spillPath = mapOutputFile.getSpillFile(getTaskId(), 0); - Path spillIndexPath = mapOutputFile.getSpillIndexFile(getTaskId(), 0); - localFs.rename(spillPath, finalOutputFile); - localFs.rename(spillIndexPath, finalIndexFile); + localFs.rename(filename[0], finalOutputFile); + localFs.rename(indexFileName[0], finalIndexFile); return; } @@ -513,14 +536,6 @@ return; } { - Path [] filename = new Path[numSpills]; - Path [] indexFileName = new Path[numSpills]; - - for(int i = 0; i < numSpills; i++) { - filename[i] = mapOutputFile.getSpillFile(getTaskId(), i); - indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i); - } - //create a sorter object as we need access to the SegmentDescriptor //class and merge methods Sorter sorter = new Sorter(localFs, keyClass, valClass, job); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Mon May 7 14:25:07 2007 @@ -17,18 +17,13 @@ */ package org.apache.hadoop.mapred; -import org.apache.hadoop.conf.*; - import java.io.*; /** Runs a map task. */ class MapTaskRunner extends TaskRunner { - private MapOutputFile mapOutputFile; public MapTaskRunner(Task task, TaskTracker tracker, JobConf conf) { super(task, tracker, conf); - this.mapOutputFile = new MapOutputFile(); - this.mapOutputFile.setConf(conf); } /** Delete any temporary files from previous failed attempts. */ @@ -37,13 +32,13 @@ return false; } - this.mapOutputFile.removeAll(getTask().getTaskId()); + mapOutputFile.removeAll(getTask().getTaskId()); return true; } /** Delete all of the temporary map output files. */ public void close() throws IOException { LOG.info(getTask()+" done; removing files."); - this.mapOutputFile.removeAll(getTask().getTaskId()); + mapOutputFile.removeAll(getTask().getTaskId()); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May 7 14:25:07 2007 @@ -61,6 +61,7 @@ import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; import static org.apache.hadoop.mapred.Task.Counter.*; @@ -265,7 +266,14 @@ // the list of files to merge, otherwise not. List<Path> mapFilesList = new ArrayList<Path>(); for(int i=0; i < numMaps; i++) { - Path f = mapOutputFile.getInputFile(i, getTaskId()); + Path f; + try { + //catch and ignore DiskErrorException, since some map outputs will + //really be absent (inmem merge). + f = mapOutputFile.getInputFile(i, getTaskId()); + } catch (DiskErrorException d) { + continue; + } if (lfs.exists(f)) mapFilesList.add(f); } @@ -292,7 +300,7 @@ }; sortProgress.setName("Sort progress reporter for task "+getTaskId()); - Path tempDir = job.getLocalPath(getTaskId()); + Path tempDir = new Path(getTaskId()); WritableComparator comparator = job.getOutputValueGroupingComparator(); @@ -496,6 +504,11 @@ private Random random = null; + /** + * the max size of the merge output from ramfs + */ + private long ramfsMergeOutputSize; + /** Represents the result of an attempt to copy a map output */ private class CopyResult { @@ -523,6 +536,15 @@ public MapOutputLocation getLocation() { return loc; } } + private int extractMapIdFromPathName(Path pathname) { + //all paths end with map_<id>.out + String firstPathName = pathname.getName(); + int beginIndex = firstPathName.lastIndexOf("map_"); + int endIndex = firstPathName.lastIndexOf(".out"); + return Integer.parseInt(firstPathName.substring(beginIndex + + "map_".length(), endIndex)); + } + private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) { //spawn a thread to give copy progress heartbeats Thread copyProgress = new Thread() { @@ -645,14 +667,17 @@ String reduceId = reduceTask.getTaskId(); LOG.info(reduceId + " Copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."); - // the place where the file should end up - Path finalFilename = conf.getLocalPath(reduceId + "/map_" + - loc.getMapId() + ".out"); + // a temp filename. If this file gets created in ramfs, we're fine, + // else, we will check the localFS to find a suitable final location + // for this path + Path filename = new Path("/" + reduceId + "/map_" + + loc.getMapId() + ".out"); // a working filename that will be unique to this attempt - Path tmpFilename = new Path(finalFilename + "-" + id); + Path tmpFilename = new Path(filename + "-" + id); // this copies the map output file tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics, - tmpFilename, reduceTask.getPartition(), + tmpFilename, lDirAlloc, + conf, reduceTask.getPartition(), STALLED_COPY_TIMEOUT); if (!neededOutputs.contains(loc.getMapId())) { if (tmpFilename != null) { @@ -662,7 +687,7 @@ return CopyResult.OBSOLETE; } if (tmpFilename == null) - throw new IOException("File " + finalFilename + "-" + id + + throw new IOException("File " + filename + "-" + id + " not created"); long bytes = -1; // lock the ReduceTask while we do the rename @@ -676,9 +701,12 @@ } bytes = fs.getLength(tmpFilename); + //resolve the final filename against the directory where the tmpFile + //got created + filename = new Path(tmpFilename.getParent(), filename.getName()); // if we can't rename the file, something is broken (and IOException // will be thrown). - if (!fs.rename(tmpFilename, finalFilename)) { + if (!fs.rename(tmpFilename, filename)) { fs.delete(tmpFilename); bytes = -1; throw new IOException("failure to rename map output " + @@ -766,6 +794,8 @@ inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf); LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " + uri); + ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE * + inMemFileSys.getFSSize()); localFileSys = FileSystem.getLocal(conf); //create an instance of the sorter sorter = @@ -1025,9 +1055,12 @@ //it is not guaranteed that this file will be present after merge //is called (we delete empty sequence files as soon as we see them //in the merge method) + int mapId = extractMapIdFromPathName(inMemClosedFiles[0]); + Path outputPath = mapOutputFile.getInputFileForWrite(mapId, + reduceTask.getTaskId(), ramfsMergeOutputSize); SequenceFile.Writer writer = sorter.cloneFileAttributes( inMemFileSys.makeQualified(inMemClosedFiles[0]), - localFileSys.makeQualified(inMemClosedFiles[0]), null); + localFileSys.makeQualified(outputPath), null); SequenceFile.Sorter.RawKeyValueIterator rIter = null; try { @@ -1046,7 +1079,7 @@ LOG.info(reduceTask.getTaskId() + " Merge of the " +inMemClosedFiles.length + " files in InMemoryFileSystem complete." + - " Local file is " + inMemClosedFiles[0]); + " Local file is " + outputPath); } catch (Throwable t) { LOG.warn(reduceTask.getTaskId() + " Final merge of the inmemory files threw an exception: " + @@ -1151,9 +1184,16 @@ //it is not guaranteed that this file will be present after merge //is called (we delete empty sequence files as soon as we see them //in the merge method) + + //figure out the mapId + int mapId = extractMapIdFromPathName(inMemClosedFiles[0]); + + Path outputPath = mapOutputFile.getInputFileForWrite(mapId, + reduceTask.getTaskId(), ramfsMergeOutputSize); + SequenceFile.Writer writer = sorter.cloneFileAttributes( inMemFileSys.makeQualified(inMemClosedFiles[0]), - localFileSys.makeQualified(inMemClosedFiles[0]), null); + localFileSys.makeQualified(outputPath), null); SequenceFile.Sorter.RawKeyValueIterator rIter; try { rIter = sorter.merge(inMemClosedFiles, true, @@ -1162,7 +1202,7 @@ //make sure that we delete the ondisk file that we created //earlier when we invoked cloneFileAttributes writer.close(); - localFileSys.delete(inMemClosedFiles[0]); + localFileSys.delete(outputPath); throw new IOException (StringUtils.stringifyException(e)); } sorter.writeFile(rIter, writer); @@ -1170,7 +1210,7 @@ LOG.info(reduceTask.getTaskId() + " Merge of the " +inMemClosedFiles.length + " files in InMemoryFileSystem complete." + - " Local file is " + inMemClosedFiles[0]); + " Local file is " + outputPath); } else { LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon May 7 14:25:07 2007 @@ -17,23 +17,15 @@ */ package org.apache.hadoop.mapred; -import org.apache.hadoop.conf.*; - import java.io.*; /** Runs a reduce task. */ class ReduceTaskRunner extends TaskRunner { - /** - * for cleaning up old map outputs - */ - private MapOutputFile mapOutputFile; public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) throws IOException { super(task, tracker, conf); - this.mapOutputFile = new MapOutputFile(); - this.mapOutputFile.setConf(conf); } /** Assemble all of the map output files */ @@ -43,7 +35,7 @@ } // cleanup from failures - this.mapOutputFile.removeAll(getTask().getTaskId()); + mapOutputFile.removeAll(getTask().getTaskId()); return true; } @@ -52,6 +44,6 @@ public void close() throws IOException { LOG.info(getTask()+" done; removing files."); getTask().getProgress().setStatus("closed"); - this.mapOutputFile.removeAll(getTask().getTaskId()); + mapOutputFile.removeAll(getTask().getTaskId()); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon May 7 14:25:07 2007 @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; @@ -69,6 +70,7 @@ protected JobConf conf; protected MapOutputFile mapOutputFile = new MapOutputFile(); + protected LocalDirAllocator lDirAlloc; //////////////////////////////////////////// // Constructors @@ -293,6 +295,7 @@ this.conf = new JobConf(conf); } this.mapOutputFile.setConf(this.conf); + this.lDirAlloc = new LocalDirAllocator("mapred.local.dir"); } public Configuration getConf() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon May 7 14:25:07 2007 @@ -43,7 +43,12 @@ private TaskLog.Writer taskStdOutLogWriter; private TaskLog.Writer taskStdErrLogWriter; - + + /** + * for cleaning up old map outputs + */ + protected MapOutputFile mapOutputFile; + public TaskRunner(Task t, TaskTracker tracker, JobConf conf) { this.t = t; this.tracker = tracker; @@ -60,6 +65,8 @@ this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, this.conf.getBoolean("mapred.userlog.purgesplits", true), this.conf.getInt("mapred.userlog.retain.hours", 12)); + this.mapOutputFile = new MapOutputFile(); + this.mapOutputFile.setConf(conf); } public Task getTask() { return t; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=535997&r1=535996&r2=535997 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon May 7 14:25:07 2007 @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -88,6 +89,7 @@ private boolean running = true; + private LocalDirAllocator localDirAllocator; String taskTrackerName; String localHostname; InetSocketAddress jobTrackAddr; @@ -657,10 +659,12 @@ // let the jsp pages get to the task tracker, config, and other relevant // objects FileSystem local = FileSystem.getLocal(conf); + this.localDirAllocator = new LocalDirAllocator("mapred.local.dir"); server.setAttribute("task.tracker", this); server.setAttribute("local.file.system", local); server.setAttribute("conf", conf); server.setAttribute("log", LOG); + server.setAttribute("localDirAllocator", localDirAllocator); server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class); server.start(); this.httpPort = server.getPort(); @@ -1872,15 +1876,19 @@ byte[] buffer = new byte[MAX_BYTES_TO_READ]; OutputStream outStream = response.getOutputStream(); JobConf conf = (JobConf) context.getAttribute("conf"); + LocalDirAllocator lDirAlloc = + (LocalDirAllocator)context.getAttribute("localDirAllocator"); FileSystem fileSys = (FileSystem) context.getAttribute("local.file.system"); // Index file - Path indexFileName = conf.getLocalPath(mapId+"/file.out.index"); + Path indexFileName = lDirAlloc.getLocalPathToRead( + mapId+"/file.out.index", conf); FSDataInputStream indexIn = null; // Map-output file - Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); + Path mapOutputFileName = lDirAlloc.getLocalPathToRead( + mapId+"/file.out", conf); FSDataInputStream mapOutputIn = null; // true iff IOException was caused by attempt to access input