Author: cutting Date: Wed Jan 10 09:46:54 2007 New Revision: 494905 URL: http://svn.apache.org/viewvc?view=rev&rev=494905 Log: HADOOP-851. Add support for the LZO codec. Contributed by Arun.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/ lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoCompressor.c lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/LzoDecompressor.c lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/Makefile.am lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/Makefile.in lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/lzo/org_apache_hadoop_io_compress_lzo.h Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/build.xml lucene/hadoop/trunk/src/native/Makefile.am lucene/hadoop/trunk/src/native/Makefile.in lucene/hadoop/trunk/src/native/NEWS lucene/hadoop/trunk/src/native/config.h.in lucene/hadoop/trunk/src/native/configure lucene/hadoop/trunk/src/native/configure.ac lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/org_apache_hadoop_io_compress_zlib.h lucene/hadoop/trunk/src/native/src/org_apache_hadoop.h lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=494905&r1=494904&r2=494905 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jan 10 09:46:54 2007 @@ -32,6 +32,10 @@ 10. HADOOP-873. Pass java.library.path correctly to child processes. (omalley via cutting) +11. HADOOP-851. Add support for the LZO codec. This is much faster + than the default, zlib-based compression, but it is only available + when the native library is built. (Arun C Murthy via cutting) + Release 0.10.0 - 2007-01-05 Modified: lucene/hadoop/trunk/build.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=494905&r1=494904&r2=494905 ============================================================================== --- lucene/hadoop/trunk/build.xml (original) +++ lucene/hadoop/trunk/build.xml Wed Jan 10 09:46:54 2007 @@ -194,6 +194,7 @@ <mkdir dir="${build.native}/lib"/> <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/zlib"/> + <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/lzo"/> <javah classpath="${build.classes}" @@ -203,6 +204,16 @@ > <class name="org.apache.hadoop.io.compress.zlib.ZlibCompressor" /> <class name="org.apache.hadoop.io.compress.zlib.ZlibDecompressor" /> + </javah> + + <javah + classpath="${build.classes}" + destdir="${build.native}/src/org/apache/hadoop/io/compress/lzo" + force="yes" + verbose="yes" + > + <class name="org.apache.hadoop.io.compress.lzo.LzoCompressor" /> + <class name="org.apache.hadoop.io.compress.lzo.LzoDecompressor" /> </javah> <exec dir="${build.native}" executable="sh" failonerror="true"> Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java?view=auto&rev=494905 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockCompressorStream.java Wed Jan 10 09:46:54 2007 @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A [EMAIL PROTECTED] org.apache.hadoop.io.compress.CompressorStream} which works + * with 'block-based' based compression algorithms, as opposed to + * 'stream-based' compression algorithms. + * + * @author Arun C Murthy + */ +class BlockCompressorStream extends CompressorStream { + + // The 'maximum' size of input data to be compressed, to account + // for the overhead of the compression algorithm. + private final int MAX_INPUT_SIZE; + + /** + * Create a [EMAIL PROTECTED] BlockCompressorStream}. + * + * @param out stream + * @param compressor compressor to be used + * @param bufferSize size of buffer + * @param compressionOverhead maximum 'overhead' of the compression + * algorithm with given bufferSize + */ + public BlockCompressorStream(OutputStream out, Compressor compressor, + int bufferSize, int compressionOverhead) { + super(out, compressor, bufferSize); + MAX_INPUT_SIZE = bufferSize - compressionOverhead; + } + + /** + * Create a [EMAIL PROTECTED] BlockCompressorStream} with given output-stream and + * compressor. + * Use default of 512 as bufferSize and compressionOverhead of + * (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm). + * + * @param out stream + * @param compressor compressor to be used + */ + public BlockCompressorStream(OutputStream out, Compressor compressor) { + this(out, compressor, 512, 18); + } + + public void write(byte[] b, int off, int len) throws IOException { + // Sanity checks + if (compressor.finished()) { + throw new IOException("write beyond end of stream"); + } + if (b == null) { + throw new NullPointerException(); + } else if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + + // Write out the length of the original data + rawWriteInt(len); + + // Compress data + if (!compressor.finished()) { + do { + // Compress atmost 'maxInputSize' chunks at a time + int bufLen = Math.min(len, MAX_INPUT_SIZE); + + compressor.setInput(b, off, bufLen); + while (!compressor.needsInput()) { + compress(); + } + off += bufLen; + len -= bufLen; + } while (len > 0); + } + } + + void compress() throws IOException { + int len = compressor.compress(buffer, 0, buffer.length); + if (len > 0) { + // Write out the compressed chunk + rawWriteInt(len); + out.write(buffer, 0, len); + } + } + + private void rawWriteInt(int v) throws IOException { + out.write((v >>> 24) & 0xFF); + out.write((v >>> 16) & 0xFF); + out.write((v >>> 8) & 0xFF); + out.write((v >>> 0) & 0xFF); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java?view=auto&rev=494905 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java Wed Jan 10 09:46:54 2007 @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + * A [EMAIL PROTECTED] org.apache.hadoop.io.compress.DecompressorStream} which works + * with 'block-based' based compression algorithms, as opposed to + * 'stream-based' compression algorithms. + * + * @author Arun C Murthy + */ +class BlockDecompressorStream extends DecompressorStream { + private int originalBlockSize = 0; + private int noUncompressedBytes = 0; + + /** + * Create a [EMAIL PROTECTED] BlockDecompressorStream}. + * + * @param in input stream + * @param decompressor decompressor to use + * @param bufferSize size of buffer + */ + public BlockDecompressorStream(InputStream in, Decompressor decompressor, + int bufferSize) { + super(in, decompressor, bufferSize); + } + + /** + * Create a [EMAIL PROTECTED] BlockDecompressorStream}. + * + * @param in input stream + * @param decompressor decompressor to use + */ + public BlockDecompressorStream(InputStream in, Decompressor decompressor) { + super(in, decompressor); + } + + protected BlockDecompressorStream(InputStream in) { + super(in); + } + + int decompress(byte[] b, int off, int len) throws IOException { + // Check if we are the beginning of a block + if (noUncompressedBytes == originalBlockSize) { + // Get original data size + try { + originalBlockSize = rawReadInt(); + } catch (IOException ioe) { + return -1; + } + noUncompressedBytes = 0; + } + + int n = 0; + while ((n = decompressor.decompress(b, off, len)) == 0) { + if (decompressor.finished() || decompressor.needsDictionary()) { + if (noUncompressedBytes >= originalBlockSize) { + eof = true; + return -1; + } + } + if (decompressor.needsInput()) { + getCompressedData(); + } + } + + // Note the no. of decompressed bytes read from 'current' block + noUncompressedBytes += n; + + return n; + } + + void getCompressedData() throws IOException { + checkStream(); + + // Get the size of the compressed chunk + int len = rawReadInt(); + + // Read len bytes from underlying stream + if (len > buffer.length) { + buffer = new byte[len]; + } + int n = 0, off = 0; + while (n < len) { + int count = in.read(buffer, off + n, len - n); + if (count < 0) { + throw new EOFException(); + } + n += count; + } + + // Send the read data to the decompressor + decompressor.setInput(buffer, 0, len); + } + + public void resetState() throws IOException { + super.resetState(); + } + + private int rawReadInt() throws IOException { + int b1 = in.read(); + int b2 = in.read(); + int b3 = in.read(); + int b4 = in.read(); + if ((b1 | b2 | b3 | b4) < 0) + throw new EOFException(); + return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0)); + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java?view=auto&rev=494905 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java Wed Jan 10 09:46:54 2007 @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.lzo.*; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** + * A [EMAIL PROTECTED] org.apache.hadoop.io.compress.CompressionCodec} for a streaming + * <b>lzo</b> compression/decompression pair. + * http://www.oberhumer.com/opensource/lzo/ + * + * @author Arun C Murthy + */ +public class LzoCodec implements Configurable, CompressionCodec { + + private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName()); + + private Configuration conf; + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return conf; + } + + private static boolean nativeLzoLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + nativeLzoLoaded = LzoCompressor.isNativeLzoLoaded() && + LzoDecompressor.isNativeLzoLoaded(); + + if (nativeLzoLoaded) { + LOG.info("Successfully loaded & initialized native-lzo library"); + } else { + LOG.error("Failed to load/initialize native-lzo library"); + } + } else { + LOG.error("Cannot load native-lzo without native-hadoop"); + } + } + + /** + * Check if native-lzo library is loaded & initialized. + * + * @return <code>true</code> if native-lzo library is loaded & initialized; + * else <code>false</code> + */ + public static boolean isNativeLzoLoaded() { + return nativeLzoLoaded; + } + + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new IOException("native-lzo library not available"); + } + + /** + * <b>http://www.oberhumer.com/opensource/lzo/lzofaq.php</b> + * + * How much can my data expand during compression ? + * ================================================ + * LZO will expand incompressible data by a little amount. + * I still haven't computed the exact values, but I suggest using + * these formulas for a worst-case expansion calculation: + * + * Algorithm LZO1, LZO1A, LZO1B, LZO1C, LZO1F, LZO1X, LZO1Y, LZO1Z: + * ---------------------------------------------------------------- + * output_block_size = input_block_size + (input_block_size / 16) + 64 + 3 + * + * This is about 106% for a large block size. + * + * Algorithm LZO2A: + * ---------------- + * output_block_size = input_block_size + (input_block_size / 8) + 128 + 3 + */ + + // Create the lzo output-stream + LzoCompressor.CompressionStrategy strategy = + LzoCompressor.CompressionStrategy.valueOf( + conf.get("io.compression.codec.lzo.compressor", + LzoCompressor.CompressionStrategy.LZO1X_1.name() + ) + ); + int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", + 64*1024); + int compressionOverhead = 0; + if (strategy.name().contains("LZO1")) { + compressionOverhead = (int)(((bufferSize - (64 + 3)) * 16.0) / 17.0); + } else { + compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0); + } + + return new BlockCompressorStream(out, + new LzoCompressor(strategy, bufferSize), + bufferSize, compressionOverhead); + } + + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + // Ensure native-lzo library is loaded & initialized + if (!isNativeLzoLoaded()) { + throw new IOException("native-lzo library not available"); + } + + // Create the lzo input-stream + LzoDecompressor.CompressionStrategy strategy = + LzoDecompressor.CompressionStrategy.valueOf( + conf.get("io.compression.codec.lzo.decompressor", + LzoDecompressor.CompressionStrategy.LZO1X.name() + ) + ); + int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", + 64*1024); + + return new BlockDecompressorStream(in, + new LzoDecompressor(strategy, bufferSize), + bufferSize); + } + + /** + * Get the default filename extension for this kind of compression. + * @return the extension including the '.' + */ + public String getDefaultExtension() { + return ".lzo"; + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java?view=auto&rev=494905 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java Wed Jan 10 09:46:54 2007 @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.util.NativeCodeLoader; + +/** + * A [EMAIL PROTECTED] Compressor} based on the lzo algorithm. + * http://www.oberhumer.com/opensource/lzo/ + * + * @author Arun C Murthy + */ +public class LzoCompressor implements Compressor { + private static final Log LOG = + LogFactory.getLog(LzoCompressor.class.getName()); + + private int directBufferSize; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private Buffer uncompressedDirectBuf = null; + private int uncompressedDirectBufLen = 0; + private Buffer compressedDirectBuf = null; + private boolean finish, finished; + + private CompressionStrategy strategy; // The lzo compression algorithm. + private long lzoCompressor = 0; // The actual lzo compression function. + private int workingMemoryBufLen = 0; // The length of 'working memory' buf. + private Buffer workingMemoryBuf; // The 'working memory' for lzo. + + /** + * The compression algorithm for lzo library. + */ + public static enum CompressionStrategy { + /** + * lzo1 algorithms. + */ + LZO1 (0), + LZO1_99 (1), + + /** + * lzo1a algorithms. + */ + LZO1A (2), + LZO1A_99 (3), + + /** + * lzo1b algorithms. + */ + LZO1B (4), + LZO1B_BEST_COMPRESSION(5), + LZO1B_BEST_SPEED(6), + LZO1B_1 (7), + LZO1B_2 (8), + LZO1B_3 (9), + LZO1B_4 (10), + LZO1B_5 (11), + LZO1B_6 (12), + LZO1B_7 (13), + LZO1B_8 (14), + LZO1B_9 (15), + LZO1B_99 (16), + LZO1B_999 (17), + + /** + * lzo1c algorithms. + */ + LZO1C (18), + LZO1C_BEST_COMPRESSION(19), + LZO1C_BEST_SPEED(20), + LZO1C_1 (21), + LZO1C_2 (22), + LZO1C_3 (23), + LZO1C_4 (24), + LZO1C_5 (25), + LZO1C_6 (26), + LZO1C_7 (27), + LZO1C_8 (28), + LZO1C_9 (29), + LZO1C_99 (30), + LZO1C_999 (31), + + /** + * lzo1f algorithms. + */ + LZO1F_1 (32), + LZO1F_999 (33), + + /** + * lzo1x algorithms. + */ + LZO1X_1 (34), + LZO1X_11 (35), + LZO1X_12 (36), + LZO1X_15 (37), + LZO1X_999 (38), + + /** + * lzo1y algorithms. + */ + LZO1Y_1 (39), + LZO1Y_999 (40), + + /** + * lzo1z algorithms. + */ + LZO1Z_999 (41), + + /** + * lzo2a algorithms. + */ + LZO2A_999 (42); + + private final int compressor; + + private CompressionStrategy(int compressor) { + this.compressor = compressor; + } + + int getCompressor() { + return compressor; + } + }; // CompressionStrategy + + private static boolean nativeLzoLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + // Initialize the native library + initIDs(); + nativeLzoLoaded = true; + } else { + LOG.error("Cannot load " + LzoCompressor.class.getName() + + " without native-hadoop library!"); + } + } + + /** + * Check if lzo compressors are loaded and initialized. + * + * @return <code>true</code> if lzo compressors are loaded & initialized, + * else <code>false</code> + */ + public static boolean isNativeLzoLoaded() { + return nativeLzoLoaded; + } + + /** + * Creates a new compressor using the specified [EMAIL PROTECTED] CompressionStrategy}. + * + * @param strategy lzo compression algorithm to use + * @param directBufferSize size of the direct buffer to be used. + */ + public LzoCompressor(CompressionStrategy strategy, int directBufferSize) { + this.strategy = strategy; + this.directBufferSize = directBufferSize; + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); + + /** + * Initialize [EMAIL PROTECTED] #lzoCompress} and [EMAIL PROTECTED] #workingMemoryBufLen} + */ + init(this.strategy.getCompressor()); + workingMemoryBuf = ByteBuffer.allocateDirect(workingMemoryBufLen); + } + + /** + * Creates a new compressor with the default lzo1x_1 compression. + */ + public LzoCompressor() { + this(CompressionStrategy.LZO1X_1, 64*1024); + } + + public synchronized void setInput(byte[] b, int off, int len) { + if (b== null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + // Reinitialize lzo's output direct-buffer + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + synchronized void setInputFromSavedData() { + uncompressedDirectBufLen = userBufLen; + if (uncompressedDirectBufLen > directBufferSize) { + uncompressedDirectBufLen = directBufferSize; + } + + // Reinitialize lzo's input direct buffer + uncompressedDirectBuf.rewind(); + ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, + uncompressedDirectBufLen); + + // Note how much data is being fed to lzo + userBufOff += uncompressedDirectBufLen; + userBufLen -= uncompressedDirectBufLen; + } + + public synchronized void setDictionary(byte[] b, int off, int len) { + // nop + } + + public boolean needsInput() { + // Consume remaining compressed data? + if (compressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if lzo has consumed all input + if (uncompressedDirectBufLen <= 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; + } + + public synchronized void finish() { + finish = true; + } + + public synchronized boolean finished() { + // Check if 'lzo' says its 'finished' and + // all compressed data has been consumed + return (finished && compressedDirectBuf.remaining() == 0); + } + + public synchronized int compress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + int n = 0; + + // Check if there is compressed data + n = compressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + ((ByteBuffer)compressedDirectBuf).get(b, off, n); + return n; + } + + // Re-initialize the lzo's output direct-buffer + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + + // Compress data + n = compressBytesDirect(strategy.getCompressor()); + compressedDirectBuf.limit(n); + + // Set 'finished' if lzo has consumed all user-data + if (userBufLen <= 0) { + finished = true; + } + + // Get atmost 'len' bytes + n = Math.min(n, len); + ((ByteBuffer)compressedDirectBuf).get(b, off, n); + + return n; + } + + public synchronized void reset() { + finish = false; + finished = false; + uncompressedDirectBuf.rewind(); + uncompressedDirectBufLen = 0; + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; + } + + public synchronized void end() { + // nop + } + + private native static void initIDs(); + private native void init(int compressor); + private native int compressBytesDirect(int compressor); +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java?view=auto&rev=494905 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java Wed Jan 10 09:46:54 2007 @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.util.NativeCodeLoader; + +/** + * A [EMAIL PROTECTED] Decompressor} based on the lzo algorithm. + * http://www.oberhumer.com/opensource/lzo/ + * + * @author Arun C Murthy + */ +public class LzoDecompressor implements Decompressor { + private static final Log LOG = + LogFactory.getLog(LzoDecompressor.class.getName()); + + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int compressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finished; + + private CompressionStrategy strategy; + private long lzoDecompressor = 0; // The actual lzo decompression function. + + public static enum CompressionStrategy { + /** + * lzo1 algorithms. + */ + LZO1 (0), + + /** + * lzo1a algorithms. + */ + LZO1A (1), + + /** + * lzo1b algorithms. + */ + LZO1B (2), + LZO1B_SAFE(3), + + /** + * lzo1c algorithms. + */ + LZO1C (4), + LZO1C_SAFE(5), + LZO1C_ASM (6), + LZO1C_ASM_SAFE (7), + + /** + * lzo1f algorithms. + */ + LZO1F (8), + LZO1F_SAFE (9), + LZO1F_ASM_FAST (10), + LZO1F_ASM_FAST_SAFE (11), + + /** + * lzo1x algorithms. + */ + LZO1X (12), + LZO1X_SAFE (13), + LZO1X_ASM (14), + LZO1X_ASM_SAFE (15), + LZO1X_ASM_FAST (16), + LZO1X_ASM_FAST_SAFE (17), + + /** + * lzo1y algorithms. + */ + LZO1Y (18), + LZO1Y_SAFE (19), + LZO1Y_ASM (20), + LZO1Y_ASM_SAFE (21), + LZO1Y_ASM_FAST (22), + LZO1Y_ASM_FAST_SAFE (23), + + /** + * lzo1z algorithms. + */ + LZO1Z (24), + LZO1Z_SAFE (25), + + /** + * lzo2a algorithms. + */ + LZO2A (26), + LZO2A_SAFE (27); + + private final int decompressor; + + private CompressionStrategy(int decompressor) { + this.decompressor = decompressor; + } + + int getDecompressor() { + return decompressor; + } + }; // CompressionStrategy + + private static boolean nativeLzoLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + // Initialize the native library + initIDs(); + nativeLzoLoaded = true; + } else { + LOG.error("Cannot load " + LzoDecompressor.class.getName() + + " without native-hadoop library!"); + } + } + + /** + * Check if lzo decompressors are loaded and initialized. + * + * @return <code>true</code> if lzo decompressors are loaded & initialized, + * else <code>false</code> + */ + public static boolean isNativeLzoLoaded() { + return nativeLzoLoaded; + } + + /** + * Creates a new lzo decompressor. + * + * @param strategy lzo decompression algorithm + * @param directBufferSize size of the direct-buffer + */ + public LzoDecompressor(CompressionStrategy strategy, int directBufferSize) { + this.directBufferSize = directBufferSize; + this.strategy = strategy; + + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + + /** + * Initialize [EMAIL PROTECTED] #lzoDecompress} + */ + init(this.strategy.getDecompressor()); + } + + /** + * Creates a new lzo decompressor. + */ + public LzoDecompressor() { + this(CompressionStrategy.LZO1X, 64*1024); + } + + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + setInputFromSavedData(); + + // Reinitialize lzo's output direct-buffer + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + synchronized void setInputFromSavedData() { + compressedDirectBufLen = userBufLen; + if (compressedDirectBufLen > directBufferSize) { + compressedDirectBufLen = directBufferSize; + } + + // Reinitialize lzo's input direct-buffer + compressedDirectBuf.rewind(); + ((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff, + compressedDirectBufLen); + + // Note how much data is being fed to lzo + userBufOff += compressedDirectBufLen; + userBufLen -= compressedDirectBufLen; + } + + public synchronized void setDictionary(byte[] b, int off, int len) { + // nop + } + + public synchronized boolean needsInput() { + // Consume remanining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if lzo has consumed all input + if (compressedDirectBufLen <= 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; + } + + public synchronized boolean needsDictionary() { + return false; + } + + public synchronized boolean finished() { + // Check if 'lzo' says its 'finished' and + // all uncompressed data has been consumed + return (finished && uncompressedDirectBuf.remaining() == 0); + } + + public synchronized int decompress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + int n = 0; + + // Check if there is uncompressed data + n = uncompressedDirectBuf.remaining(); + if(n > 0) { + n = Math.min(n, len); + ((ByteBuffer)uncompressedDirectBuf).get(b, off, n); + return n; + } + + // Check if there is data to decompress + if (compressedDirectBufLen <= 0) { + return 0; + } + + // Re-initialize the lzo's output direct-buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Decompress data + n = decompressBytesDirect(strategy.getDecompressor()); + uncompressedDirectBuf.limit(n); + + // Set 'finished' if lzo has consumed all user-data + if (userBufLen <= 0) { + finished = true; + } + + // Return atmost 'len' bytes + n = Math.min(n, len); + ((ByteBuffer)uncompressedDirectBuf).get(b, off, n); + + return n; + } + + public synchronized void reset() { + finished = false; + compressedDirectBufLen = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; + } + + public synchronized void end() { + // nop + } + + protected void finalize() { + end(); + } + + private native static void initIDs(); + private native void init(int decompressor); + private native int decompressBytesDirect(int decompressor); +} Modified: lucene/hadoop/trunk/src/native/Makefile.am URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/Makefile.am?view=diff&rev=494905&r1=494904&r2=494905 ============================================================================== --- lucene/hadoop/trunk/src/native/Makefile.am (original) +++ lucene/hadoop/trunk/src/native/Makefile.am Wed Jan 10 09:46:54 2007 @@ -36,7 +36,7 @@ export PLATFORM = $(shell echo $$OS_NAME | tr [A-Z] [a-z]) # List the sub-directories here -SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib +SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib # The following export is needed to build libhadoop.so in the 'lib' directory export SUBDIRS Modified: lucene/hadoop/trunk/src/native/Makefile.in URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/Makefile.in?view=diff&rev=494905&r1=494904&r2=494905 ============================================================================== --- lucene/hadoop/trunk/src/native/Makefile.in (original) +++ lucene/hadoop/trunk/src/native/Makefile.in Wed Jan 10 09:46:54 2007 @@ -207,7 +207,7 @@ target_alias = @target_alias@ # List the sub-directories here -SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib +SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib all: config.h $(MAKE) $(AM_MAKEFLAGS) all-recursive Modified: lucene/hadoop/trunk/src/native/NEWS URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/NEWS?view=diff&rev=494905&r1=494904&r2=494905 ============================================================================== --- lucene/hadoop/trunk/src/native/NEWS (original) +++ lucene/hadoop/trunk/src/native/NEWS Wed Jan 10 09:46:54 2007 @@ -1,3 +1,5 @@ 2006-10-05 Arun C Murthy <[EMAIL PROTECTED]> * Initial version of libhadoop released +2007-01-03 Arun C Murthy <[EMAIL PROTECTED]> + * Added support for lzo compression library Modified: lucene/hadoop/trunk/src/native/config.h.in URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/config.h.in?view=diff&rev=494905&r1=494904&r2=494905 ============================================================================== --- lucene/hadoop/trunk/src/native/config.h.in (original) +++ lucene/hadoop/trunk/src/native/config.h.in Wed Jan 10 09:46:54 2007 @@ -1,5 +1,8 @@ /* config.h.in. Generated from configure.ac by autoheader. */ +/* The 'actual' dynamic-library for '-llzo2' */ +#undef HADOOP_LZO_LIBRARY + /* The 'actual' dynamic-library for '-lz' */ #undef HADOOP_ZLIB_LIBRARY @@ -18,8 +21,41 @@ /* Define to 1 if you have the `jvm' library (-ljvm). */ #undef HAVE_LIBJVM +/* Define to 1 if you have the `lzo2' library (-llzo2). */ +#undef HAVE_LIBLZO2 + /* Define to 1 if you have the `z' library (-lz). */ #undef HAVE_LIBZ + +/* Define to 1 if you have the <lzo/lzo1a.h> header file. */ +#undef HAVE_LZO_LZO1A_H + +/* Define to 1 if you have the <lzo/lzo1b.h> header file. */ +#undef HAVE_LZO_LZO1B_H + +/* Define to 1 if you have the <lzo/lzo1c.h> header file. */ +#undef HAVE_LZO_LZO1C_H + +/* Define to 1 if you have the <lzo/lzo1f.h> header file. */ +#undef HAVE_LZO_LZO1F_H + +/* Define to 1 if you have the <lzo/lzo1x.h> header file. */ +#undef HAVE_LZO_LZO1X_H + +/* Define to 1 if you have the <lzo/lzo1y.h> header file. */ +#undef HAVE_LZO_LZO1Y_H + +/* Define to 1 if you have the <lzo/lzo1z.h> header file. */ +#undef HAVE_LZO_LZO1Z_H + +/* Define to 1 if you have the <lzo/lzo1.h> header file. */ +#undef HAVE_LZO_LZO1_H + +/* Define to 1 if you have the <lzo/lzo2a.h> header file. */ +#undef HAVE_LZO_LZO2A_H + +/* Define to 1 if you have the <lzo/lzo_asm.h> header file. */ +#undef HAVE_LZO_LZO_ASM_H /* Define to 1 if you have the <memory.h> header file. */ #undef HAVE_MEMORY_H