Author: cutting Date: Mon Jun 18 14:59:36 2007 New Revision: 548505 URL: http://svn.apache.org/viewvc?view=rev&rev=548505 Log: HADOOP-1193. Pool allocation of compression codecs. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 18 14:59:36 2007 @@ -164,6 +164,11 @@ files, so that disk space, writability, etc. is considered. (Dhruba Borthakur via cutting) + 52. HADOOP-1193. Pool allocation of compression codecs. This + eliminates a memory leak that could cause OutOfMemoryException, + and also substantially improves performance. + (Arun C Murthy via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Jun 18 14:59:36 2007 @@ -21,8 +21,6 @@ import java.io.*; import java.util.*; import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.rmi.server.UID; import java.security.MessageDigest; import org.apache.commons.logging.*; @@ -30,6 +28,8 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.zlib.ZlibFactory; @@ -293,7 +293,7 @@ Class keyClass, Class valClass, boolean compress, boolean blockCompress, CompressionCodec codec, Metadata metadata) throws IOException { - if ((codec instanceof GzipCodec) && + if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded()) { throw new IllegalArgumentException("SequenceFile doesn't work with " + @@ -315,25 +315,47 @@ /** * Construct the preferred type of 'raw' SequenceFile Writer. - * @param out The stream on top which the writer is to be constructed. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param file The name of the file. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param compress Compress data? * @param blockCompress Compress blocks? + * @param codec The compression codec. + * @param progress + * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ private static Writer - createWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, boolean compress, boolean blockCompress, - CompressionCodec codec) - throws IOException { - Writer writer = createWriter(conf, out, keyClass, valClass, compress, - blockCompress, codec, new Metadata()); - return writer; + createWriter(FileSystem fs, Configuration conf, Path file, + Class keyClass, Class valClass, + boolean compress, boolean blockCompress, + CompressionCodec codec, Progressable progress, Metadata metadata) + throws IOException { + if (codec != null && (codec instanceof GzipCodec) && + !NativeCodeLoader.isNativeCodeLoaded() && + !ZlibFactory.isNativeZlibLoaded()) { + throw new IllegalArgumentException("SequenceFile doesn't work with " + + "GzipCodec without native-hadoop code!"); } + Writer writer = null; + + if (!compress) { + writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata); + } else if (compress && !blockCompress) { + writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, + codec, progress, metadata); + } else { + writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, + codec, progress, metadata); + } + return writer; +} + /** * Construct the preferred type of 'raw' SequenceFile Writer. * @param conf The configuration. @@ -598,8 +620,16 @@ /** Write key/value pairs to a sequence-format file. */ public static class Writer { + /** + * A global compressor pool used to save the expensive + * construction/destruction of (possibly native) compression codecs. + */ + private static final CodecPool<Compressor> compressorPool = + new CodecPool<Compressor>(); + Configuration conf; FSDataOutputStream out; + boolean ownOutputStream = true; DataOutputBuffer buffer = new DataOutputBuffer(); Class keyClass; @@ -610,7 +640,8 @@ CompressionOutputStream deflateFilter = null; DataOutputStream deflateOut = null; Metadata metadata = null; - + Compressor compressor = null; + // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record // starts and ends by scanning for this value. @@ -651,6 +682,7 @@ private Writer(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, Metadata metadata) throws IOException { + this.ownOutputStream = false; init(null, conf, out, keyClass, valClass, false, null, metadata); initializeFileHeader(); @@ -703,7 +735,11 @@ this.metadata = metadata; if (this.codec != null) { ReflectionUtils.setConf(this.codec, this.conf); - this.deflateFilter = this.codec.createOutputStream(buffer); + compressor = compressorPool.getCodec(this.codec.getCompressorType()); + if (compressor == null) { + compressor = this.codec.createCompressor(); + } + this.deflateFilter = this.codec.createOutputStream(buffer, compressor); this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); } @@ -727,8 +763,15 @@ /** Close the file. */ public synchronized void close() throws IOException { + compressorPool.returnCodec(compressor); + if (out != null) { - out.close(); + out.flush(); + + // Close the underlying stream iff we own it... + if (ownOutputStream) { + out.close(); + } out = null; } } @@ -849,6 +892,7 @@ private RecordCompressWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) throws IOException { + this.ownOutputStream = false; super.init(null, conf, out, keyClass, valClass, true, codec, metadata); initializeFileHeader(); @@ -967,6 +1011,7 @@ private BlockCompressWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) throws IOException { + this.ownOutputStream = false; super.init(null, conf, out, keyClass, valClass, true, codec, metadata); init(1000000); @@ -1035,9 +1080,8 @@ public synchronized void close() throws IOException { if (out != null) { writeBlock(); - out.close(); - out = null; } + super.close(); } public void sync() throws IOException { @@ -1107,6 +1151,13 @@ /** Reads key/value pairs from a sequence-format file. */ public static class Reader { + /** + * A global decompressor pool used to save the expensive + * construction/destruction of (possibly native) decompression codecs. + */ + private static final CodecPool<Decompressor> decompressorPool = + new CodecPool<Decompressor>(); + private Path file; private FSDataInputStream in; private DataOutputBuffer outBuf = new DataOutputBuffer(); @@ -1142,43 +1193,62 @@ private DataInputBuffer keyLenBuffer = null; private CompressionInputStream keyLenInFilter = null; private DataInputStream keyLenIn = null; + private Decompressor keyLenDecompressor = null; private DataInputBuffer keyBuffer = null; private CompressionInputStream keyInFilter = null; private DataInputStream keyIn = null; + private Decompressor keyDecompressor = null; private DataInputBuffer valLenBuffer = null; private CompressionInputStream valLenInFilter = null; private DataInputStream valLenIn = null; + private Decompressor valLenDecompressor = null; private DataInputBuffer valBuffer = null; private CompressionInputStream valInFilter = null; private DataInputStream valIn = null; + private Decompressor valDecompressor = null; /** Open the named file. */ public Reader(FileSystem fs, Path file, Configuration conf) throws IOException { - this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf); + this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false); } - private Reader(FileSystem fs, Path name, int bufferSize, - Configuration conf) throws IOException { - this.file = name; - this.in = fs.open(file, bufferSize); - this.end = fs.getLength(file); - this.conf = conf; - init(); + private Reader(FileSystem fs, Path file, int bufferSize, + Configuration conf, boolean tempReader) throws IOException { + this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader); } private Reader(FileSystem fs, Path file, int bufferSize, long start, - long length, Configuration conf) throws IOException { + long length, Configuration conf, boolean tempReader) + throws IOException { this.file = file; this.in = fs.open(file, bufferSize); this.conf = conf; seek(start); this.end = in.getPos() + length; - init(); + init(tempReader); + } + + private Decompressor getPooledOrNewDecompressor() { + Decompressor decompressor = null; + decompressor = decompressorPool.getCodec(codec.getDecompressorType()); + if (decompressor == null) { + decompressor = codec.createDecompressor(); + } + return decompressor; } - private void init() throws IOException { + + /** + * Initialize the [EMAIL PROTECTED] Reader} + * @param tmpReader <code>true</code> if we are constructing a temporary + * reader [EMAIL PROTECTED] SequenceFile.Sorter.cloneFileAttributes}, + * and hence do not initialize every component; + * <code>false</code> otherwise. + * @throws IOException + */ + private void init(boolean tempReader) throws IOException { byte[] versionBlock = new byte[VERSION.length]; in.readFully(versionBlock); @@ -1245,33 +1315,48 @@ in.readFully(sync); // read sync bytes } - // Initialize - valBuffer = new DataInputBuffer(); - if (decompress) { - valInFilter = this.codec.createInputStream(valBuffer); - valIn = new DataInputStream(valInFilter); - } else { - valIn = valBuffer; - } - - if (blockCompressed) { - keyLenBuffer = new DataInputBuffer(); - keyBuffer = new DataInputBuffer(); - valLenBuffer = new DataInputBuffer(); - - keyLenInFilter = this.codec.createInputStream(keyLenBuffer); - keyLenIn = new DataInputStream(keyLenInFilter); - - keyInFilter = this.codec.createInputStream(keyBuffer); - keyIn = new DataInputStream(keyInFilter); + // Initialize... *not* if this we are constructing a temporary Reader + if (!tempReader) { + valBuffer = new DataInputBuffer(); + if (decompress) { + valDecompressor = getPooledOrNewDecompressor(); + valInFilter = codec.createInputStream(valBuffer, valDecompressor); + valIn = new DataInputStream(valInFilter); + } else { + valIn = valBuffer; + } - valLenInFilter = this.codec.createInputStream(valLenBuffer); - valLenIn = new DataInputStream(valLenInFilter); + if (blockCompressed) { + keyLenBuffer = new DataInputBuffer(); + keyBuffer = new DataInputBuffer(); + valLenBuffer = new DataInputBuffer(); + + keyLenDecompressor = getPooledOrNewDecompressor(); + keyLenInFilter = codec.createInputStream(keyLenBuffer, + keyLenDecompressor); + keyLenIn = new DataInputStream(keyLenInFilter); + + keyDecompressor = getPooledOrNewDecompressor(); + keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); + keyIn = new DataInputStream(keyInFilter); + + valLenDecompressor = getPooledOrNewDecompressor(); + valLenInFilter = codec.createInputStream(valLenBuffer, + valLenDecompressor); + valLenIn = new DataInputStream(valLenInFilter); + } } } /** Close the file. */ public synchronized void close() throws IOException { + // Return the decompressors to the pool + decompressorPool.returnCodec(keyLenDecompressor); + decompressorPool.returnCodec(keyDecompressor); + decompressorPool.returnCodec(valLenDecompressor); + decompressorPool.returnCodec(valDecompressor); + + // Close the input-stream in.close(); } @@ -1755,6 +1840,49 @@ } + private static class CodecPool<T> { + + private Map<Class, List<T>> pool = new HashMap<Class, List<T>>(); + + public T getCodec(Class codecClass) { + T codec = null; + + // Check if an appropriate codec is available + synchronized (pool) { + if (pool.containsKey(codecClass)) { + List<T> codecList = pool.get(codecClass); + + if (codecList != null) { + synchronized (codecList) { + if (!codecList.isEmpty()) { + codec = codecList.remove(0); + } + } + } + } + } + + return codec; + } + + public void returnCodec(T codec) { + if (codec != null) { + Class codecClass = codec.getClass(); + synchronized (pool) { + if (!pool.containsKey(codecClass)) { + pool.put(codecClass, new ArrayList<T>()); + } + + List<T> codecList = pool.get(codecClass); + synchronized (codecList) { + codecList.add(codec); + } + } + } + } + + } + /** Sorts key/value pairs in a sequence-format file. * * <p>For best performance, applications should make sure that the [EMAIL PROTECTED] @@ -1951,7 +2079,7 @@ } continue; } - //int length = buffer.getLength() - start; + int keyLength = rawKeys.getLength() - keyOffset; if (count == keyOffsets.length) @@ -2026,7 +2154,8 @@ long segmentStart = out.getPos(); Writer writer = createWriter(conf, out, keyClass, valClass, - isCompressed, isBlockCompressed, codec); + isCompressed, isBlockCompressed, codec, + new Metadata()); if (!done) { writer.sync = null; // disable sync on temp files @@ -2036,14 +2165,12 @@ int p = pointers[i]; writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); } - writer.sync(); - writer.out.flush(); - + writer.close(); if (!done) { // Save the segment length WritableUtils.writeVLong(indexOut, segmentStart); - WritableUtils.writeVLong(indexOut, (writer.out.getPos()-segmentStart)); + WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); indexOut.flush(); } } @@ -2179,24 +2306,6 @@ /** * Clones the attributes (like compression of the input file and creates a * corresponding Writer - * @param ignoredFileSys the (ignored) FileSystem object - * @param inputFile the path of the input file whose attributes should be - * cloned - * @param outputFile the path of the output file - * @param prog the Progressable to report status during the file write - * @return Writer - * @throws IOException - * @deprecated call #cloneFileAttributes(Path,Path,Progressable) instead - */ - public Writer cloneFileAttributes(FileSystem ignoredFileSys, - Path inputFile, Path outputFile, Progressable prog) - throws IOException { - return cloneFileAttributes(inputFile, outputFile, prog); - } - - /** - * Clones the attributes (like compression of the input file and creates a - * corresponding Writer * @param inputFile the path of the input file whose attributes should be * cloned * @param outputFile the path of the output file @@ -2205,24 +2314,19 @@ * @throws IOException */ public Writer cloneFileAttributes(Path inputFile, Path outputFile, - Progressable prog) throws IOException { + Progressable prog) + throws IOException { FileSystem srcFileSys = inputFile.getFileSystem(conf); - Reader reader = new Reader(srcFileSys, inputFile, 4096, conf); + Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true); boolean compress = reader.isCompressed(); boolean blockCompress = reader.isBlockCompressed(); CompressionCodec codec = reader.getCompressionCodec(); reader.close(); - - FileSystem dstFileSys = outputFile.getFileSystem(conf); - FSDataOutputStream out; - if (prog != null) - out = dstFileSys.create(outputFile, true, - conf.getInt("io.file.buffer.size", 4096), prog); - else - out = dstFileSys.create(outputFile, true, - conf.getInt("io.file.buffer.size", 4096)); - Writer writer = createWriter(conf, out, keyClass, valClass, compress, - blockCompress, codec); + + Writer writer = createWriter(outputFile.getFileSystem(conf), conf, + outputFile, keyClass, valClass, compress, + blockCompress, codec, prog, + new Metadata()); return writer; } @@ -2457,7 +2561,7 @@ Path outputFile = lDirAlloc.getLocalPathForWrite( tmpFilename.toString(), approxOutputSize, conf); - LOG.info("writing intermediate results to " + outputFile); + LOG.debug("writing intermediate results to " + outputFile); Writer writer = cloneFileAttributes( fs.makeQualified(segmentsToMerge.get(0).segmentPathName), fs.makeQualified(outputFile), null); @@ -2590,7 +2694,7 @@ } Reader reader = new Reader(fs, segmentPathName, bufferSize, segmentOffset, - segmentLength, conf); + segmentLength, conf, false); //sometimes we ignore syncs especially for temp merge files if (ignoreSync) reader.sync = null; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java Mon Jun 18 14:59:36 2007 @@ -29,19 +29,79 @@ public interface CompressionCodec { /** - * Create a stream compressor that will write to the given output stream. + * Create a [EMAIL PROTECTED] CompressionOutputStream} that will write to the given + * [EMAIL PROTECTED] OutputStream}. + * * @param out the location for the final output stream - * @return a stream the user can write uncompressed data to + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException */ CompressionOutputStream createOutputStream(OutputStream out) - throws IOException; + throws IOException; + + /** + * Create a [EMAIL PROTECTED] CompressionOutputStream} that will write to the given + * [EMAIL PROTECTED] OutputStream} with the given [EMAIL PROTECTED] Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException; + + /** + * Get the type of [EMAIL PROTECTED] Compressor} needed by this [EMAIL PROTECTED] CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + Class getCompressorType(); + + /** + * Create a new [EMAIL PROTECTED] Compressor} for use by this [EMAIL PROTECTED] CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + Compressor createCompressor(); /** * Create a stream decompressor that will read from the given input stream. + * * @param in the stream to read compressed bytes from * @return a stream to read uncompressed bytes from + * @throws IOException */ CompressionInputStream createInputStream(InputStream in) throws IOException; + + /** + * Create a [EMAIL PROTECTED] CompressionInputStream} that will read from the given + * [EMAIL PROTECTED] InputStream} with the given [EMAIL PROTECTED] Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException; + + + /** + * Get the type of [EMAIL PROTECTED] Decompressor} needed by this [EMAIL PROTECTED] CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + Class getDecompressorType(); + + /** + * Create a new [EMAIL PROTECTED] Decompressor} for use by this [EMAIL PROTECTED] CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + Decompressor createDecompressor(); /** * Get the default filename extension for this kind of compression. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Mon Jun 18 14:59:36 2007 @@ -29,7 +29,7 @@ public class DefaultCodec implements Configurable, CompressionCodec { Configuration conf; - + public void setConf(Configuration conf) { this.conf = conf; } @@ -38,33 +38,50 @@ return conf; } - /** - * Create a stream compressor that will write to the given output stream. - * @param out the location for the final output stream - * @return a stream the user can write uncompressed data to - */ public CompressionOutputStream createOutputStream(OutputStream out) - throws IOException { - return new CompressorStream(out, ZlibFactory.getZlibCompressor(), + throws IOException { + return new CompressorStream(out, createCompressor(), conf.getInt("io.file.buffer.size", 4*1024)); } - - /** - * Create a stream decompressor that will read from the given input stream. - * @param in the stream to read compressed bytes from - * @return a stream to read uncompressed bytes from - */ + + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + return new CompressorStream(out, compressor, + conf.getInt("io.file.buffer.size", 4*1024)); + } + + public Class getCompressorType() { + return ZlibFactory.getZlibCompressorType(); + } + + public Compressor createCompressor() { + return ZlibFactory.getZlibCompressor(); + } + public CompressionInputStream createInputStream(InputStream in) - throws IOException { - return new DecompressorStream(in, ZlibFactory.getZlibDecompressor(), + throws IOException { + return new DecompressorStream(in, createDecompressor(), + conf.getInt("io.file.buffer.size", 4*1024)); + } + + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + return new DecompressorStream(in, decompressor, conf.getInt("io.file.buffer.size", 4*1024)); } + + public Class getDecompressorType() { + return ZlibFactory.getZlibDecompressorType(); + } + + public Decompressor createDecompressor() { + return ZlibFactory.getZlibDecompressor(); + } - /** - * Get the default filename extension for this kind of compression. - * @return the extension including the '.' - */ public String getDefaultExtension() { return ".deflate"; } + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Mon Jun 18 14:59:36 2007 @@ -135,58 +135,68 @@ } } - /** - * Create a stream compressor that will write to the given output stream. - * @param out the location for the final output stream - * @return a stream the user can write uncompressed data to - */ public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - CompressionOutputStream compOutStream = null; - - if (ZlibFactory.isNativeZlibLoaded()) { - Compressor compressor = - new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION, - ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY, - ZlibCompressor.CompressionHeader.GZIP_FORMAT, - 64*1024); - - compOutStream = new CompressorStream(out, compressor, - conf.getInt("io.file.buffer.size", 4*1024)); - } else { - compOutStream = new GzipOutputStream(out); - } - - return compOutStream; + return (ZlibFactory.isNativeZlibLoaded()) ? + new CompressorStream(out, createCompressor(), + conf.getInt("io.file.buffer.size", 4*1024)) : + new GzipOutputStream(out); } - /** - * Create a stream decompressor that will read from the given input stream. - * @param in the stream to read compressed bytes from - * @return a stream to read uncompressed bytes from - */ + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + return (compressor != null) ? + new CompressorStream(out, compressor, + conf.getInt("io.file.buffer.size", + 4*1024)) : + createOutputStream(out); + + } + + public Compressor createCompressor() { + return (ZlibFactory.isNativeZlibLoaded()) ? + new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION, + ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY, + ZlibCompressor.CompressionHeader.GZIP_FORMAT, + 64*1024) : + null; + } + + public Class getCompressorType() { + return ZlibFactory.getZlibCompressorType(); + } + public CompressionInputStream createInputStream(InputStream in) - throws IOException { - CompressionInputStream compInStream = null; - - if (ZlibFactory.isNativeZlibLoaded()) { - Decompressor decompressor = - new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, - 64*1024); - - compInStream = new DecompressorStream(in, decompressor, - conf.getInt("io.file.buffer.size", 4*1024)); - } else { - compInStream = new GzipInputStream(in); - } - - return compInStream; + throws IOException { + return (ZlibFactory.isNativeZlibLoaded()) ? + new DecompressorStream(in, createDecompressor(), + conf.getInt("io.file.buffer.size", + 4*1024)) : + new GzipInputStream(in); } - - /** - * Get the default filename extension for this kind of compression. - * @return the extension including the '.' - */ + + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + return (decompressor != null) ? + new DecompressorStream(in, decompressor, + conf.getInt("io.file.buffer.size", + 4*1024)) : + createInputStream(in); + } + + public Decompressor createDecompressor() { + return (ZlibFactory.isNativeZlibLoaded()) ? + new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, + 64*1024) : + null; + } + + public Class getDecompressorType() { + return ZlibFactory.getZlibDecompressorType(); + } + public String getDefaultExtension() { return ".gz"; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java Mon Jun 18 14:59:36 2007 @@ -41,7 +41,7 @@ private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName()); private Configuration conf; - + public void setConf(Configuration conf) { this.conf = conf; } @@ -81,7 +81,7 @@ throws IOException { // Ensure native-lzo library is loaded & initialized if (!isNativeLzoLoaded()) { - throw new IOException("native-lzo library not available"); + throw new RuntimeException("native-lzo library not available"); } /** @@ -119,12 +119,66 @@ } else { compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0); } - + return new BlockCompressorStream(out, new LzoCompressor(strategy, bufferSize), bufferSize, compressionOverhead); } + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new RuntimeException("native-lzo library not available"); + } + + LzoCompressor.CompressionStrategy strategy = + LzoCompressor.CompressionStrategy.valueOf( + conf.get("io.compression.codec.lzo.compressor", + LzoCompressor.CompressionStrategy.LZO1X_1.name() + ) + ); + int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", + 64*1024); + int compressionOverhead = 0; + if (strategy.name().contains("LZO1")) { + compressionOverhead = (int)(((bufferSize - (64 + 3)) * 16.0) / 17.0); + } else { + compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0); + } + + return new BlockCompressorStream(out, compressor, bufferSize, + compressionOverhead); + } + + public Class getCompressorType() { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new RuntimeException("native-lzo library not available"); + } + + return LzoCompressor.class; + } + + public Compressor createCompressor() { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new RuntimeException("native-lzo library not available"); + } + + LzoCompressor.CompressionStrategy strategy = + LzoCompressor.CompressionStrategy.valueOf( + conf.get("io.compression.codec.lzo.compressor", + LzoCompressor.CompressionStrategy.LZO1X_1.name() + ) + ); + int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", + 64*1024); + + return new LzoCompressor(strategy, bufferSize); + } + public CompressionInputStream createInputStream(InputStream in) throws IOException { // Ensure native-lzo library is loaded & initialized @@ -146,7 +200,47 @@ new LzoDecompressor(strategy, bufferSize), bufferSize); } - + + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new RuntimeException("native-lzo library not available"); + } + + return new BlockDecompressorStream(in, decompressor, + conf.getInt("io.compression.codec.lzo.buffersize", + 64*1024)); + } + + public Class getDecompressorType() { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new RuntimeException("native-lzo library not available"); + } + + return LzoDecompressor.class; + } + + public Decompressor createDecompressor() { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new RuntimeException("native-lzo library not available"); + } + + LzoDecompressor.CompressionStrategy strategy = + LzoDecompressor.CompressionStrategy.valueOf( + conf.get("io.compression.codec.lzo.decompressor", + LzoDecompressor.CompressionStrategy.LZO1X.name() + ) + ); + int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", + 64*1024); + + return new LzoDecompressor(strategy, bufferSize); + } + /** * Get the default filename extension for this kind of compression. * @return the extension including the '.' Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Mon Jun 18 14:59:36 2007 @@ -22,8 +22,6 @@ import java.nio.Buffer; import java.nio.ByteBuffer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.util.NativeCodeLoader; @@ -35,7 +33,7 @@ * @author Arun C Murthy */ public class ZlibCompressor implements Compressor { - private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024; + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; private long stream; private CompressionLevel level; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Mon Jun 18 14:59:36 2007 @@ -22,8 +22,6 @@ import java.nio.Buffer; import java.nio.ByteBuffer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.util.NativeCodeLoader; @@ -35,7 +33,7 @@ * @author Arun C Murthy */ public class ZlibDecompressor implements Decompressor { - private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024; + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; private long stream; private CompressionHeader header; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java Mon Jun 18 14:59:36 2007 @@ -23,6 +23,7 @@ import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.util.StringUtils; /** * A collection of factories to create the right @@ -60,21 +61,53 @@ } /** + * Return the appropriate type of the zlib compressor. + * + * @return the appropriate type of the zlib compressor. + */ + public static Class getZlibCompressorType() { + return (nativeZlibLoaded) ? + ZlibCompressor.class : BuiltInZlibDeflater.class; + } + + /** * Return the appropriate implementation of the zlib compressor. * * @return the appropriate implementation of the zlib compressor. */ public static Compressor getZlibCompressor() { + LOG.info("Creating a new ZlibCompressor"); + try { + throw new Exception(); + } catch (Exception e) { + e.printStackTrace(); + } return (nativeZlibLoaded) ? new ZlibCompressor() : new BuiltInZlibDeflater(); } /** + * Return the appropriate type of the zlib decompressor. + * + * @return the appropriate type of the zlib decompressor. + */ + public static Class getZlibDecompressorType() { + return (nativeZlibLoaded) ? + ZlibDecompressor.class : BuiltInZlibInflater.class; + } + + /** * Return the appropriate implementation of the zlib decompressor. * * @return the appropriate implementation of the zlib decompressor. */ public static Decompressor getZlibDecompressor() { + LOG.info("Creating a new ZlibDecompressor"); + try { + throw new Exception(); + } catch (Exception e) { + e.printStackTrace(); + } return (nativeZlibLoaded) ? new ZlibDecompressor() : new BuiltInZlibInflater(); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jun 18 14:59:36 2007 @@ -42,6 +42,7 @@ import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Sorter; +import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator; import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor; import org.apache.hadoop.io.compress.CompressionCodec; @@ -332,10 +333,10 @@ job.getMapOutputValueClass(), compressionType, codec); } private void endPartition(int partNumber) throws IOException { - //Need to write syncs especially if block compression is in use + //Need to close the file, especially if block compression is in use //We also update the index file to contain the part offsets per //spilled file - writer.sync(); + writer.close(); indexOut.writeLong(segmentStart); //we also store 0 length key/val segments to make the merge phase easier. indexOut.writeLong(out.getPos()-segmentStart); @@ -529,11 +530,13 @@ //create dummy files for (int i = 0; i < partitions; i++) { segmentStart = finalOut.getPos(); - SequenceFile.createWriter(job, finalOut, - job.getMapOutputKeyClass(), job.getMapOutputValueClass(), - compressionType, codec); + Writer writer = SequenceFile.createWriter(job, finalOut, + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + compressionType, codec); finalIndexOut.writeLong(segmentStart); finalIndexOut.writeLong(finalOut.getPos() - segmentStart); + writer.close(); } finalOut.close(); finalIndexOut.close(); @@ -560,14 +563,14 @@ segmentList.add(i, s); } segmentStart = finalOut.getPos(); + RawKeyValueIterator kvIter = sorter.merge(segmentList, new Path(getTaskId())); SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), compressionType, codec); - sorter.writeFile(sorter.merge(segmentList, new Path(getTaskId())), - writer); - //add a sync block - required esp. for block compression to ensure + sorter.writeFile(kvIter, writer); + //close the file - required esp. for block compression to ensure //partition data don't span partition boundaries - writer.sync(); + writer.close(); //when we write the offset/length to the final index file, we write //longs for both. This helps us to reliably seek directly to the //offset/length for a partition when we start serving the byte-ranges Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Mon Jun 18 14:59:36 2007 @@ -18,6 +18,7 @@ package org.apache.hadoop.io.compress; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.*; @@ -39,14 +40,44 @@ return conf; } - public CompressionOutputStream createOutputStream(OutputStream out) { + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { return null; } - public CompressionInputStream createInputStream(InputStream in) { + public Class getCompressorType() { return null; } - + + public Compressor createCompressor() { + return null; + } + + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + return null; + } + + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return null; + } + + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + return null; + } + + public Class getDecompressorType() { + return null; + } + + public Decompressor createDecompressor() { + return null; + } + public String getDefaultExtension() { return ".base"; } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java?view=diff&rev=548505&r1=548504&r2=548505 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java Mon Jun 18 14:59:36 2007 @@ -1,42 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.mapred; import java.io.*; -import java.net.URI; -import java.net.URISyntaxException; import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.mapred.lib.*; +import org.apache.hadoop.util.ToolBase; import org.apache.hadoop.fs.*; -public class BigMapOutput { +public class BigMapOutput extends ToolBase { + public static final Log LOG = + LogFactory.getLog(BigMapOutput.class.getName()); + private static Random random = new Random(); + + private static void randomizeBytes(byte[] data, int offset, int length) { + for(int i=offset + length - 1; i >= offset; --i) { + data[i] = (byte) random.nextInt(256); + } + } + + private static void createBigMapInputFile(Configuration conf, FileSystem fs, + Path dir, long fileSizeInMB) + throws IOException { + // Check if the input path exists and is non-empty + if (fs.exists(dir)) { + Path[] list = fs.listPaths(dir); + if (list != null && list.length > 0) { + throw new IOException("Input path: " + dir + " already exists... "); + } + } + + Path file = new Path(dir, "part-0"); + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, file, + BytesWritable.class, BytesWritable.class, + CompressionType.NONE); + long numBytesToWrite = fileSizeInMB * 1024 * 1024; + int minKeySize = conf.getInt("test.bmo.min_key", 10);; + int keySizeRange = + conf.getInt("test.bmo.max_key", 1000) - minKeySize; + int minValueSize = conf.getInt("test.bmo.min_value", 0); + int valueSizeRange = + conf.getInt("test.bmo.max_value", 20000) - minValueSize; + BytesWritable randomKey = new BytesWritable(); + BytesWritable randomValue = new BytesWritable(); + + LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " + + "minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange + + " minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange); + long start = System.currentTimeMillis(); + while (numBytesToWrite > 0) { + int keyLength = minKeySize + + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0); + randomKey.setSize(keyLength); + randomizeBytes(randomKey.get(), 0, randomKey.getSize()); + int valueLength = minValueSize + + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0); + randomValue.setSize(valueLength); + randomizeBytes(randomValue.get(), 0, randomValue.getSize()); + writer.append(randomKey, randomValue); + numBytesToWrite -= keyLength + valueLength; + } + writer.close(); + long end = System.currentTimeMillis(); - public static void main(String[] args) throws IOException { - if (args.length != 4) { //input-dir should contain a huge file ( > 2GB) - System.err.println("BigMapOutput " + - "-input <input-dir> -output <output-dir>"); - System.exit(1); + LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " + + (end-start)/1000 + "secs"); + } + + private static void usage() { + System.err.println("BigMapOutput -input <input-dir> -output <output-dir> " + + "[-create <filesize in MB>]"); + System.exit(1); + } + public int run(String[] args) throws Exception { + if (args.length < 4) { //input-dir should contain a huge file ( > 2GB) + usage(); } Path bigMapInput = null; Path outputPath = null; + boolean createInput = false; + long fileSizeInMB = 3 * 1024; // default of 3GB (>2GB) for(int i=0; i < args.length; ++i) { if ("-input".equals(args[i])){ bigMapInput = new Path(args[++i]); } else if ("-output".equals(args[i])){ outputPath = new Path(args[++i]); + } else if ("-create".equals(args[i])) { + createInput = true; + fileSizeInMB = Long.parseLong(args[++i]); + } else { + usage(); } } - Configuration defaults = new Configuration(); - FileSystem fs = FileSystem.get(defaults); - JobConf jobConf = new JobConf(defaults, BigMapOutput.class); + FileSystem fs = FileSystem.get(conf); + JobConf jobConf = new JobConf(conf, BigMapOutput.class); jobConf.setJobName("BigMapOutput"); jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); @@ -50,12 +133,23 @@ jobConf.setReducerClass(IdentityReducer.class); jobConf.setOutputKeyClass(BytesWritable.class); jobConf.setOutputValueClass(BytesWritable.class); - + + if (createInput) { + createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB); + } + Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(jobConf); Date end_time = new Date(); System.out.println("Job ended: " + end_time); - + + return 0; } + + public static void main(String argv[]) throws Exception { + int res = new BigMapOutput().doMain(new Configuration(), argv); + System.exit(res); + } + }