Author: cutting Date: Fri Jan 19 10:17:44 2007 New Revision: 497891 URL: http://svn.apache.org/viewvc?view=rev&rev=497891 Log: HADOOP-830. Improve mapreduce merge performance by buffering and merging map outputs as they arrive at reduce nodes. Contributed by Devaraj.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 19 10:17:44 2007 @@ -47,6 +47,10 @@ 14. HADOOP-735. Switch generated record code to use BytesWritable to represent fields of type 'buffer'. (Milind Bhandarkar via cutting) +15. HADOOP-830. Improve mapreduce merge performance by buffering and + merging multiple map outputs as they arrive at reduce nodes before + they're written to disk. (Devaraj Das via cutting) + Release 0.10.1 - 2007-01-10 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Jan 19 10:17:44 2007 @@ -125,6 +125,18 @@ </property> <property> + <name>fs.ramfs.impl</name> + <value>org.apache.hadoop.fs.InMemoryFileSystem</value> + <description>The FileSystem for ramfs: uris.</description> +</property> + +<property> + <name>fs.inmemory.size.mb</name> + <value>75</value> + <description>The size of the in-memory filsystem instance in MB</description> +</property> + +<property> <name>dfs.datanode.bindAddress</name> <value>0.0.0.0</value> <description> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Fri Jan 19 10:17:44 2007 @@ -177,4 +177,11 @@ return ((Buffer)out).getPos(); } + public static long getChecksumLength(long size, int bytesPerSum) { + //the checksum length is equal to size passed divided by bytesPerSum + + //bytes written in the beginning of the checksum file. + return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 + + CHECKSUM_VERSION.length; + } + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Fri Jan 19 10:17:44 2007 @@ -200,6 +200,13 @@ return new Path(file.getParent(), "."+file.getName()+".crc"); } + /** Return the length of the checksum file given the size of the + * actual file. + **/ + public static long getChecksumFileLength(long fileSize, int bytesPerSum) { + return FSDataOutputStream.getChecksumLength(fileSize, bytesPerSum); + } + /** Return true iff file is a checksum file name.*/ public static boolean isChecksumFile(Path file) { String name = file.getName(); Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=auto&rev=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Fri Jan 19 10:17:44 2007 @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.*; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Progressable; + +/** An implementation of the in-memory filesystem. This implementation assumes + * that the file lengths are known ahead of time and the total lengths of all + * the files is below a certain number (like 100 MB, configurable). Use the API + * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of + * the API for reserving space in the FS. The uri of this filesystem starts with + * ramfs:// . + * @author ddas + * + */ +public class InMemoryFileSystem extends FileSystem { + private URI uri; + private int fsSize; + private volatile int totalUsed; + private Path staticWorkingDir; + private int bytesPerSum; + + //pathToFileAttribs is the final place where a file is put after it is closed + private Map <String, FileAttributes> pathToFileAttribs = + Collections.synchronizedMap(new HashMap()); + + //tempFileAttribs is a temp place which is updated while reserving memory for + //files we are going to create. It is read in the createRaw method and the + //temp key/value is discarded. If the file makes it to "close", then it + //ends up being in the pathToFileAttribs map. + private Map <String, FileAttributes> tempFileAttribs = + Collections.synchronizedMap(new HashMap()); + + public InMemoryFileSystem() {} + + public InMemoryFileSystem(URI uri, Configuration conf) { + initialize(uri, conf); + } + + //inherit javadoc + public void initialize(URI uri, Configuration conf) { + int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100")); + this.fsSize = size * 1024 * 1024; + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + this.staticWorkingDir = new Path(this.uri.getPath()); + this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512); + LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + + " of size (in bytes): " + fsSize); + } + + //inherit javadoc + public URI getUri() { + return uri; + } + + /** @deprecated */ + public String getName() { + return uri.toString(); + } + + /** + * Return 1x1 'inmemory' cell if the file exists. + * Return null if otherwise. + */ + public String[][] getFileCacheHints(Path f, long start, long len) + throws IOException { + if (! exists(f)) { + return null; + } else { + return new String[][] {{"inmemory"}}; + } + } + + private class InMemoryInputStream extends FSInputStream { + private DataInputBuffer din = new DataInputBuffer(); + private FileAttributes fAttr; + + public InMemoryInputStream(Path f) throws IOException { + fAttr = pathToFileAttribs.get(getPath(f)); + if (fAttr == null) throw new FileNotFoundException("File " + f + + " does not exist"); + din.reset(fAttr.data, 0, fAttr.size); + } + + public long getPos() throws IOException { + return din.getPosition(); + } + + public void seek(long pos) throws IOException { + if ((int)pos > fAttr.size) + throw new IOException("Cannot seek after EOF"); + din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos); + } + + public int available() throws IOException { + return din.available(); + } + public boolean markSupport() { return false; } + + public int read() throws IOException { + return din.read(); + } + + public int read(byte[] b, int off, int len) throws IOException { + return din.read(b, off, len); + } + + public long skip(long n) throws IOException { return din.skip(n); } + } + + public FSInputStream openRaw(Path f) throws IOException { + return new InMemoryInputStream(f); + } + + private class InMemoryOutputStream extends FSOutputStream { + private int count; + private FileAttributes fAttr; + private Path f; + + public InMemoryOutputStream(Path f, FileAttributes fAttr) + throws IOException { + this.fAttr = fAttr; + this.f = f; + } + + public long getPos() throws IOException { + return count; + } + + public void close() throws IOException { + synchronized (InMemoryFileSystem.this) { + pathToFileAttribs.put(getPath(f), fAttr); + } + } + + public void write(byte[] b, int off, int len) throws IOException { + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + int newcount = count + len; + if (newcount > fAttr.size) { + throw new IOException("Insufficient space"); + } + System.arraycopy(b, off, fAttr.data, count, len); + count = newcount; + } + + public void write(int b) throws IOException { + int newcount = count + 1; + if (newcount > fAttr.size) { + throw new IOException("Insufficient space"); + } + fAttr.data[count] = (byte)b; + count = newcount; + } + } + + public FSOutputStream createRaw(Path f, boolean overwrite, short replication, + long blockSize) throws IOException { + if (exists(f) && ! overwrite) { + throw new IOException("File already exists:"+f); + } + synchronized (this) { + FileAttributes fAttr =(FileAttributes)tempFileAttribs.remove(getPath(f)); + if (fAttr != null) + return createRaw(f, fAttr); + return null; + } + } + + public FSOutputStream createRaw(Path f, boolean overwrite, short replication, + long blockSize, Progressable progress) throws IOException { + //ignore write-progress reporter for in-mem files + return createRaw(f, overwrite, replication, blockSize); + } + + public FSOutputStream createRaw(Path f, FileAttributes fAttr) + throws IOException { + //the path is not added into the filesystem (in the pathToFileAttribs + //map) until close is called on the outputstream that this method is + //going to return + //Create an output stream out of data byte array + return new InMemoryOutputStream(f, fAttr); + } + + public void close() throws IOException { + super.close(); + if (pathToFileAttribs != null) + pathToFileAttribs.clear(); + pathToFileAttribs = null; + if (tempFileAttribs != null) + tempFileAttribs.clear(); + tempFileAttribs = null; + } + + /** + * Replication is not supported for the inmemory file system. + */ + public short getReplication(Path src) throws IOException { + return 1; + } + + public boolean setReplicationRaw(Path src, short replication) + throws IOException { + return true; + } + + public boolean renameRaw(Path src, Path dst) throws IOException { + synchronized (this) { + FileAttributes fAttr = pathToFileAttribs.remove(getPath(src)); + if (fAttr == null) return false; + pathToFileAttribs.put(getPath(dst), fAttr); + return true; + } + } + + public boolean deleteRaw(Path f) throws IOException { + synchronized (this) { + FileAttributes fAttr = pathToFileAttribs.remove(getPath(f)); + if (fAttr != null) { + fAttr.data = null; + totalUsed -= fAttr.size; + return true; + } + return false; + } + } + + public boolean exists(Path f) throws IOException { + return pathToFileAttribs.containsKey(getPath(f)); + } + + /** + * Directory operations are not supported + */ + public boolean isDirectory(Path f) throws IOException { + return false; + } + + public long getLength(Path f) throws IOException { + return pathToFileAttribs.get(getPath(f)).size; + } + + /** + * Directory operations are not supported + */ + public Path[] listPathsRaw(Path f) throws IOException { + return null; + } + public void setWorkingDirectory(Path new_dir) {} + public Path getWorkingDirectory() { + return staticWorkingDir; + } + public boolean mkdirs(Path f) throws IOException { + return false; + } + + /** lock operations are not supported */ + public void lock(Path f, boolean shared) throws IOException {} + public void release(Path f) throws IOException {} + + /** copy/move operations are not supported */ + public void copyFromLocalFile(Path src, Path dst) throws IOException {} + public void moveFromLocalFile(Path src, Path dst) throws IOException {} + public void copyToLocalFile(Path src, Path dst, boolean copyCrc) + throws IOException {} + + public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + return fsOutputFile; + } + + public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + } + + public void reportChecksumFailure(Path p, FSInputStream in, + long inPos, + FSInputStream sums, long sumsPos) { + } + + public long getBlockSize(Path f) throws IOException { + return getDefaultBlockSize(); + } + + public long getDefaultBlockSize() { + return 32 * 1024; //some random large number. can be anything actually + } + + public short getDefaultReplication() { + return 1; + } + + /** Some APIs exclusively for InMemoryFileSystem */ + + /** Register a path with its size. This will also register a checksum for + * the file that the user is trying to create. This is required since none + * of the FileSystem APIs accept the size of the file as argument. But since + * it is required for us to apriori know the size of the file we are going to + * create, the user must call this method for each file he wants to create + * and reserve memory for that file. We either succeed in reserving memory + * for both the main file and the checksum file and return true, or return + * false. + */ + public boolean reserveSpaceWithCheckSum(Path f, int size) { + //get the size of the checksum file (we know it is going to be 'int' + //since this is an inmem fs with file sizes that will fit in 4 bytes) + int checksumSize = getChecksumFileLength(size); + synchronized (this) { + if (!canFitInMemory(size + checksumSize)) return false; + FileAttributes fileAttr; + FileAttributes checksumAttr; + try { + fileAttr = new FileAttributes(size); + checksumAttr = new FileAttributes(checksumSize); + } catch (OutOfMemoryError o) { + return false; + } + totalUsed += size + checksumSize; + tempFileAttribs.put(getPath(f), fileAttr); + tempFileAttribs.put(getPath(FileSystem.getChecksumFile(f)),checksumAttr); + return true; + } + } + + public int getChecksumFileLength(int size) { + return (int)super.getChecksumFileLength(size, bytesPerSum); + } + + /** This API getClosedFiles could have been implemented over listPathsRaw + * but it is an overhead to maintain directory structures for this impl of + * the in-memory fs. + */ + public Path[] getFiles(PathFilter filter) { + synchronized (this) { + List <String> closedFilesList = new ArrayList(); + Set paths = pathToFileAttribs.keySet(); + if (paths == null || paths.isEmpty()) return new Path[0]; + Iterator iter = paths.iterator(); + while (iter.hasNext()) { + String f = (String)iter.next(); + if (filter.accept(new Path(f))) + closedFilesList.add(f); + } + String [] names = + closedFilesList.toArray(new String[closedFilesList.size()]); + Path [] results = new Path[names.length]; + for (int i = 0; i < names.length; i++) { + results[i] = new Path(names[i]); + } + return results; + } + } + + public int getFSSize() { + return fsSize; + } + + public float getPercentUsed() { + return (float)totalUsed/fsSize; + } + + private boolean canFitInMemory(int size) { + if (size + totalUsed < fsSize) + return true; + return false; + } + + private String getPath(Path f) { + return f.toUri().getPath(); + } + + private static class FileAttributes { + private byte[] data; + private int size; + + public FileAttributes(int size) { + this.size = size; + this.data = new byte[size]; + } + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jan 19 10:17:44 2007 @@ -21,6 +21,8 @@ import java.io.*; import java.util.*; import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.rmi.server.UID; import java.security.MessageDigest; import org.apache.lucene.util.PriorityQueue; @@ -1893,7 +1895,8 @@ } /** - * Merges the contents of files passed in Path[] + * Merges the contents of files passed in Path[] using a max factor value + * that is already set * @param inNames the array of path names * @param deleteInputs true if the input files should be deleted when * unnecessary @@ -1902,6 +1905,22 @@ */ public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs) throws IOException { + return merge(inNames, deleteInputs, + (inNames.length < factor) ? inNames.length : factor); + } + + /** + * Merges the contents of files passed in Path[] + * @param inNames the array of path names + * @param deleteInputs true if the input files should be deleted when + * unnecessary + * @param factor the factor that will be used as the maximum merge fan-in + * @return RawKeyValueIteratorMergeQueue + * @throws IOException + */ + public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, + int factor) + throws IOException { //get the segments from inNames ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); for (int i = 0; i < inNames.length; i++) { @@ -1911,7 +1930,7 @@ s.doSync(); a.add(s); } - factor = (inNames.length < factor) ? inNames.length : factor; + this.factor = factor; MergeQueue mQueue = new MergeQueue(a); return mQueue.merge(); } @@ -1948,26 +1967,48 @@ /** * Clones the attributes (like compression of the input file and creates a * corresponding Writer - * @param fileSys the FileSystem object + * @param ignoredFileSys the (ignored) FileSystem object * @param inputFile the path of the input file whose attributes should be * cloned * @param outputFile the path of the output file * @param prog the Progressable to report status during the file write * @return Writer * @throws IOException + * @deprecated call #cloneFileAttributes(Path,Path,Progressable) instead */ - public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile, - Path outputFile, Progressable prog) throws IOException { - Reader reader = new Reader(fileSys, inputFile, 4096, conf); + public Writer cloneFileAttributes(FileSystem ignoredFileSys, + Path inputFile, Path outputFile, Progressable prog) + throws IOException { + return cloneFileAttributes(inputFile, outputFile, prog); + } + + /** + * Clones the attributes (like compression of the input file and creates a + * corresponding Writer + * @param inputFile the path of the input file whose attributes should be + * cloned + * @param outputFile the path of the output file + * @param prog the Progressable to report status during the file write + * @return Writer + * @throws IOException + */ + public Writer cloneFileAttributes(Path inputFile, Path outputFile, + Progressable prog) throws IOException { + FileSystem srcFileSys = inputFile.getFileSystem(conf); + Reader reader = new Reader(srcFileSys, inputFile, 4096, conf); boolean compress = reader.isCompressed(); boolean blockCompress = reader.isBlockCompressed(); CompressionCodec codec = reader.getCompressionCodec(); reader.close(); + + FileSystem dstFileSys = outputFile.getFileSystem(conf); FSDataOutputStream out; if (prog != null) - out = fs.create(outputFile, true, memory/(factor+1), prog); + out = dstFileSys.create(outputFile, true, + conf.getInt("io.file.buffer.size", 4096), prog); else - out = fs.create(outputFile, true, memory/(factor+1)); + out = dstFileSys.create(outputFile, true, + conf.getInt("io.file.buffer.size", 4096)); Writer writer = createWriter(conf, out, keyClass, valClass, compress, blockCompress, codec); return writer; @@ -2158,7 +2199,8 @@ Path outputFile = conf.getLocalPath("mapred.local.dir", (outFile.suffix("." + passNo)).toString()); Writer writer = cloneFileAttributes(fs, - mStream[0].segmentPathName, outputFile, null); + fs.makeQualified(mStream[0].segmentPathName), + fs.makeQualified(outputFile), null); writer.sync = null; //disable sync for temp files writeFile(this, writer); writer.close(); @@ -2276,7 +2318,7 @@ public boolean nextRawKey() throws IOException { if (in == null) { Reader reader = new Reader(fs, segmentPathName, - memory/(factor+1), segmentOffset, + conf.getInt("io.file.buffer.size", 4096), segmentOffset, segmentLength, conf); //sometimes we ignore syncs especially for temp merge files Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Fri Jan 19 10:17:44 2007 @@ -29,6 +29,19 @@ public static final long HEARTBEAT_INTERVAL = 10 * 1000; public static final long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000; + //for the inmemory filesystem (to do in-memory merge) + /** + * Constant denoting when a merge of in memory files will be triggered + */ + public static final float MAX_INMEM_FILESYS_USE = 0.5f; + /** + * Constant denoting the max size (in terms of the fraction of the total + * size of the filesys) of a map output file that we will try + * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE + */ + public static final float MAX_INMEM_FILESIZE_FRACTION = + MAX_INMEM_FILESYS_USE/2; + // // Result codes // Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri Jan 19 10:17:44 2007 @@ -22,6 +22,8 @@ import java.io.*; import java.net.*; + +import org.apache.hadoop.fs.InMemoryFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; @@ -29,7 +31,7 @@ /** The location of a map output file, as passed to a reduce task via the * [EMAIL PROTECTED] InterTrackerProtocol}. */ -class MapOutputLocation implements Writable { +class MapOutputLocation implements Writable, MRConstants { static { // register a ctor WritableFactories.setFactory @@ -162,4 +164,96 @@ } return totalBytes; } + + /** + * Get the map output into a local file (either in the inmemory fs or on the + * local fs) from the remote server. + * We use the file system so that we generate checksum files on the data. + * @param inMemFileSys the inmemory filesystem to write the file to + * @param localFileSys the local filesystem to write the file to + * @param localFilename the filename to write the data into + * @param reduce the reduce id to get for + * @param timeout number of ms for connection and read timeout + * @return the path of the file that got created + * @throws IOException when something goes wrong + */ + public Path getFile(InMemoryFileSystem inMemFileSys, + FileSystem localFileSys, + Path localFilename, + int reduce, + int timeout) throws IOException, InterruptedException { + boolean good = false; + long totalBytes = 0; + FileSystem fileSys = localFileSys; + Thread currentThread = Thread.currentThread(); + URL path = new URL(toString() + "&reduce=" + reduce); + try { + URLConnection connection = path.openConnection(); + if (timeout > 0) { + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + } + InputStream input = connection.getInputStream(); + OutputStream output = null; + + //We will put a file in memory if it meets certain criteria: + //1. The size of the file should be less than 25% of the total inmem fs + //2. There is space available in the inmem fs + + int length = connection.getContentLength(); + int inMemFSSize = inMemFileSys.getFSSize(); + int checksumLength = inMemFileSys.getChecksumFileLength(length); + + boolean createInMem = + (((float)(length + checksumLength) / inMemFSSize <= + MAX_INMEM_FILESIZE_FRACTION) && + inMemFileSys.reserveSpaceWithCheckSum(localFilename, length)); + + if (createInMem) + fileSys = inMemFileSys; + else + fileSys = localFileSys; + + output = fileSys.create(localFilename); + try { + try { + byte[] buffer = new byte[64 * 1024]; + if (currentThread.isInterrupted()) { + throw new InterruptedException(); + } + int len = input.read(buffer); + while (len > 0) { + totalBytes += len; + output.write(buffer, 0 ,len); + if (currentThread.isInterrupted()) { + throw new InterruptedException(); + } + len = input.read(buffer); + } + } finally { + output.close(); + } + } finally { + input.close(); + } + good = ((int) totalBytes) == connection.getContentLength(); + if (!good) { + throw new IOException("Incomplete map output received for " + path + + " (" + totalBytes + " instead of " + + connection.getContentLength() + ")"); + } + } finally { + if (!good) { + try { + fileSys.delete(localFilename); + totalBytes = 0; + } catch (Throwable th) { + // IGNORED because we are cleaning up + } + return null; + } + } + return fileSys.makeQualified(localFilename); + } + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jan 19 10:17:44 2007 @@ -233,11 +233,18 @@ // open a file to collect map output - Path[] mapFiles = new Path[numMaps]; + // since we don't know how many map outputs got merged in memory, we have + // to check whether a given map output exists, and if it does, add it in + // the list of files to merge, otherwise not. + List <Path> mapFilesList = new ArrayList(); for(int i=0; i < numMaps; i++) { - mapFiles[i] = mapOutputFile.getInputFile(i, getTaskId()); + Path f = mapOutputFile.getInputFile(i, getTaskId()); + if (lfs.exists(f)) + mapFilesList.add(f); } - + Path[] mapFiles = new Path[mapFilesList.size()]; + mapFiles = mapFilesList.toArray(mapFiles); + // spawn a thread to give sort progress heartbeats Thread sortProgress = new Thread() { public void run() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Jan 19 10:17:44 2007 @@ -18,16 +18,22 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InMemoryFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator; import org.apache.hadoop.util.*; import java.io.*; import java.util.*; +import java.net.*; import java.text.DecimalFormat; import org.apache.hadoop.util.Progressable; /** Runs a reduce task. */ -class ReduceTaskRunner extends TaskRunner { +class ReduceTaskRunner extends TaskRunner implements MRConstants { /** Number of ms before timing out a copy */ private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000; @@ -80,11 +86,31 @@ private long lastPollTime; /** + * A reference to the in memory file system for writing the map outputs to. + */ + private InMemoryFileSystem inMemFileSys; + + /** * A reference to the local file system for writing the map outputs to. */ private FileSystem localFileSys; /** + * An instance of the sorter used for doing merge + */ + private SequenceFile.Sorter sorter; + + /** + * A reference to the throwable object (if merge throws an exception) + */ + private volatile Throwable mergeThrowable; + + /** + * A flag to indicate that merge is in progress + */ + private volatile boolean mergeInProgress = false; + + /** * The threads for fetching the files. */ private MapOutputCopier[] copiers = null; @@ -120,9 +146,28 @@ public MapOutputLocation getLocation() { return loc; } } - private class PingTimer implements Progressable { + private class PingTimer extends Thread implements Progressable { Task task = getTask(); TaskTracker tracker = getTracker(); + + public void run() { + LOG.info(task.getTaskId() + " Started thread: " + getName()); + while (true) { + try { + progress(); + Thread.sleep(Task.PROGRESS_INTERVAL); + } + catch (InterruptedException i) { + return; + } + catch (Throwable e) { + LOG.info(task.getTaskId() + " Thread Exception in " + + "reporting sort progress\n" + + StringUtils.stringifyException(e)); + continue; + } + } + } public void progress() { task.reportProgress(tracker); @@ -134,7 +179,6 @@ /** Copies map outputs as they become available */ private class MapOutputCopier extends Thread { - private PingTimer pingTimer = new PingTimer(); private MapOutputLocation currentLocation = null; private int id = nextMapOutputCopierId++; @@ -196,7 +240,7 @@ try { start(loc); - size = copyOutput(loc, pingTimer); + size = copyOutput(loc); } catch (IOException e) { LOG.warn(reduceTask.getTaskId() + " copy failed: " + loc.getMapTaskId() + " from " + loc.getHost()); @@ -215,13 +259,11 @@ /** Copies a a map output from a remote host, using raw RPC. * @param currentLocation the map output location to be copied - * @param pingee a status object to ping as we make progress - * @return the size of the copied file + * @return the path (fully qualified) of the copied file * @throws IOException if there is an error copying the file * @throws InterruptedException if the copier should give up */ - private long copyOutput(MapOutputLocation loc, - Progressable pingee + private long copyOutput(MapOutputLocation loc ) throws IOException, InterruptedException { String reduceId = reduceTask.getTaskId(); @@ -233,21 +275,42 @@ // a working filename that will be unique to this attempt Path tmpFilename = new Path(finalFilename + "-" + id); // this copies the map output file - long bytes = loc.getFile(localFileSys, tmpFilename, - reduceTask.getPartition(), pingee, + tmpFilename = loc.getFile(inMemFileSys, localFileSys, tmpFilename, + reduceTask.getPartition(), STALLED_COPY_TIMEOUT); + if (tmpFilename == null) + throw new IOException("File " + finalFilename + "-" + id + + " not created"); + long bytes = -1; // lock the ReduceTaskRunner while we do the rename synchronized (ReduceTaskRunner.this) { - // if we can't rename the file, something is broken - if (!(new File(tmpFilename.toString()). - renameTo(new File(finalFilename.toString())))) { - localFileSys.delete(tmpFilename); + // if we can't rename the file, something is broken (and IOException + // will be thrown). This file could have been created in the inmemory + // fs or the localfs. So need to get the filesystem owning the path. + FileSystem fs = tmpFilename.getFileSystem(conf); + if (!fs.rename(tmpFilename, finalFilename)) { + fs.delete(tmpFilename); throw new IOException("failure to rename map output " + tmpFilename); } + bytes = fs.getLength(finalFilename); + LOG.info(reduceId + " done copying " + loc.getMapTaskId() + + " output from " + loc.getHost() + "."); + //Create a thread to do merges. Synchronize access/update to + //mergeInProgress + if (!mergeInProgress && inMemFileSys.getPercentUsed() >= + MAX_INMEM_FILESYS_USE) { + LOG.info(reduceId + " InMemoryFileSystem " + + inMemFileSys.getUri().toString() + + " is " + inMemFileSys.getPercentUsed() + + " full. Triggering merge"); + InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys, + (LocalFileSystem)localFileSys, sorter); + m.setName("Thread for merging in memory files"); + m.setDaemon(true); + mergeInProgress = true; + m.start(); + } } - LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() + - " output from " + loc.getHost() + "."); - return bytes; } @@ -258,7 +321,6 @@ super(task, tracker, conf); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); - localFileSys = FileSystem.getLocal(conf); this.reduceTask = (ReduceTask)getTask(); this.scheduledCopies = new ArrayList(100); @@ -266,6 +328,18 @@ this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300); + //we want to distinguish inmem fs instances for different reduces. Hence, + //append a unique string in the uri for the inmem fs name + URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode()); + inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf); + LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " + + uri); + localFileSys = FileSystem.getLocal(conf); + //create an instance of the sorter + sorter = + new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), + conf.getMapOutputValueClass(), conf); + // hosts -> next contact time this.penaltyBox = new Hashtable(); @@ -311,9 +385,14 @@ // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); long currentTime = startTime; - + PingTimer pingTimer = new PingTimer(); + pingTimer.setName("Map output copy reporter for task " + + reduceTask.getTaskId()); + pingTimer.setDaemon(true); + pingTimer.start(); + try { // loop until we get all required outputs or are killed - while (!killed && numCopied < numOutputs) { + while (!killed && numCopied < numOutputs && mergeThrowable == null) { LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) + " map output(s)"); @@ -382,12 +461,11 @@ // new, just wait for a bit try { if (numInFlight == 0 && numScheduled == 0) { - getTask().reportProgress(getTracker()); Thread.sleep(5000); } } catch (InterruptedException e) { } // IGNORE - while (!killed && numInFlight > 0) { + while (!killed && numInFlight > 0 && mergeThrowable == null) { LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight); CopyResult cr = getCopyResult(); @@ -404,7 +482,6 @@ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " + mbpsFormat.format(transferRate) + " MB/s)"); - getTask().reportProgress(getTracker()); } else { // this copy failed, put it back onto neededOutputs neededOutputs.add(new Integer(cr.getMapId())); @@ -454,7 +531,64 @@ } } - return numCopied == numOutputs && !killed; + if (mergeThrowable != null) { + //set the task state to FAILED + TaskTracker tracker = ReduceTaskRunner.this.getTracker(); + TaskTracker.TaskInProgress tip = + tracker.runningTasks.get(reduceTask.getTaskId()); + tip.runstate = TaskStatus.State.FAILED; + try { + tip.cleanup(); + } catch (Throwable ie2) { + // Ignore it, we are just trying to cleanup. + } + inMemFileSys.close(); + } + + //Do a merge of in-memory files (if there are any) + if (!killed && mergeThrowable == null) { + try { + //wait for an ongoing merge (if it is in flight) to complete + while (mergeInProgress) { + Thread.sleep(200); + } + LOG.info(reduceTask.getTaskId() + + " Copying of all map outputs complete. " + + "Initiating the last merge on the remaining files in " + + inMemFileSys.getUri()); + //initiate merge + Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER); + if (inMemClosedFiles.length == 0) { + LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + + inMemFileSys.getUri()); + return numCopied == numOutputs; + } + RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, + inMemClosedFiles.length); + //name this output file same as the name of the first file that is + //there in the current list of inmem files (this is guaranteed to be + //absent on the disk currently. So we don't overwrite a prev. + //created spill) + SequenceFile.Writer writer = sorter.cloneFileAttributes( + inMemFileSys.makeQualified(inMemClosedFiles[0]), + localFileSys.makeQualified(inMemClosedFiles[0]), null); + sorter.writeFile(rIter, writer); + writer.close(); + LOG.info(reduceTask.getTaskId() + + " Merge of the " +inMemClosedFiles.length + + " files in InMemoryFileSystem complete." + + " Local file is " + inMemClosedFiles[0]); + } catch (Throwable t) { + LOG.warn("Merge of the inmemory files threw an exception: " + + StringUtils.stringifyException(t)); + inMemFileSys.close(); + return false; + } + } + return mergeThrowable == null && numCopied == numOutputs && !killed; + } finally { + pingTimer.interrupt(); + } } @@ -529,4 +663,60 @@ } } + private class InMemFSMergeThread extends Thread { + private InMemoryFileSystem inMemFileSys; + private LocalFileSystem localFileSys; + private SequenceFile.Sorter sorter; + + public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, + LocalFileSystem localFileSys, SequenceFile.Sorter sorter) { + this.inMemFileSys = inMemFileSys; + this.localFileSys = localFileSys; + this.sorter = sorter; + } + public void run() { + LOG.info(reduceTask.getTaskId() + " Thread started: " + getName()); + try { + //initiate merge + Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER); + //Note that the above Path[] could be of length 0 if all copies are + //in flight. So we make sure that we have some 'closed' map + //output files to merge to get the benefit of in-memory merge + if (inMemClosedFiles.length >= + (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) { + RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, + inMemClosedFiles.length); + //name this output file same as the name of the first file that is + //there in the current list of inmem files (this is guaranteed to be + //absent on the disk currently. So we don't overwrite a prev. + //created spill) + SequenceFile.Writer writer = sorter.cloneFileAttributes( + inMemFileSys.makeQualified(inMemClosedFiles[0]), + localFileSys.makeQualified(inMemClosedFiles[0]), null); + sorter.writeFile(rIter, writer); + writer.close(); + LOG.info(reduceTask.getTaskId() + + " Merge of the " +inMemClosedFiles.length + + " files in InMemoryFileSystem complete." + + " Local file is " + inMemClosedFiles[0]); + } + else { + LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + + inMemFileSys.getUri()); + } + } catch (Throwable t) { + LOG.warn("Merge of the inmemory files threw an exception: " + + StringUtils.stringifyException(t)); + ReduceTaskRunner.this.mergeThrowable = t; + } + finally { + mergeInProgress = false; + } + } + } + final private static PathFilter MAP_OUTPUT_FILTER = new PathFilter() { + public boolean accept(Path file) { + return file.toString().endsWith(".out"); + } + }; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=497891&r1=497890&r2=497891 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jan 19 10:17:44 2007 @@ -885,7 +885,7 @@ class TaskInProgress { Task task; float progress; - TaskStatus.State runstate; + volatile TaskStatus.State runstate; long lastProgressReport; StringBuffer diagnosticInfo = new StringBuffer(); TaskRunner runner;