Author: omalley
Date: Fri Mar 4 03:50:52 2011
New Revision: 1077196
URL: http://svn.apache.org/viewvc?rev=1077196&view=rev
Log:
commit 2b897f5aff5fa2e1c7f2a3e0360b0ae9f2f31498
Author: Hemanth Yamijala <[email protected]>
Date: Tue Feb 9 12:07:45 2010 +0530
HADOOP:5879 from
http://issues.apache.org/jira/secure/attachment/12435254/hadoop-5879-yahoo-0.20-v1.0.patch
+++ b/YAHOO-CHANGES.txt
+ HADOOP-5879. Read compression level and strategy from Configuration for
+ gzip compression. (He Yongqiang via cdouglas)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
Fri Mar 4 03:50:52 2011
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -91,20 +92,26 @@ public class CodecPool {
*
* @param codec the <code>CompressionCodec</code> for which to get the
* <code>Compressor</code>
+ * @param conf the <code>Configuration</code> object which contains confs
for creating or reinit the compressor
* @return <code>Compressor</code> for the given
* <code>CompressionCodec</code> from the pool or a new one
*/
- public static Compressor getCompressor(CompressionCodec codec) {
+ public static Compressor getCompressor(CompressionCodec codec, Configuration
conf) {
Compressor compressor = borrow(compressorPool, codec.getCompressorType());
if (compressor == null) {
compressor = codec.createCompressor();
LOG.info("Got brand-new compressor");
} else {
+ compressor.reinit(conf);
LOG.debug("Got recycled compressor");
}
return compressor;
}
+ public static Compressor getCompressor(CompressionCodec codec) {
+ return getCompressor(codec, null);
+ }
+
/**
* Get a {@link Decompressor} for the given {@link CompressionCodec} from the
* pool or a new one.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
Fri Mar 4 03:50:52 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.io.compress;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Specification of a stream-based 'compressor' which can be
* plugged into a {@link CompressionOutputStream} to compress data.
@@ -102,5 +104,13 @@ public interface Compressor {
/**
* Closes the compressor and discards any unprocessed input.
*/
- public void end();
+ public void end();
+
+ /**
+ * Prepare the compressor to be used in a new stream with settings defined in
+ * the given Configuration
+ *
+ * @param conf Configuration from which new setting are fetched
+ */
+ public void reinit(Configuration conf);
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
Fri Mar 4 03:50:52 2011
@@ -22,8 +22,11 @@ import java.io.*;
import java.util.zip.GZIPOutputStream;
import java.util.zip.GZIPInputStream;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.zlib.*;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
/**
* This class creates gzip compressors/decompressors.
@@ -154,7 +157,7 @@ public class GzipCodec extends DefaultCo
public Compressor createCompressor() {
return (ZlibFactory.isNativeZlibLoaded(conf))
- ? new GzipZlibCompressor()
+ ? new GzipZlibCompressor(conf)
: null;
}
@@ -205,6 +208,13 @@ public class GzipCodec extends DefaultCo
ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
}
+
+ public GzipZlibCompressor(Configuration conf) {
+ super(ZlibFactory.getCompressionLevel(conf),
+ ZlibFactory.getCompressionStrategy(conf),
+ ZlibCompressor.CompressionHeader.GZIP_FORMAT,
+ 64 * 1024);
+ }
}
static final class GzipZlibDecompressor extends ZlibDecompressor {
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
Fri Mar 4 03:50:52 2011
@@ -2,6 +2,7 @@ package org.apache.hadoop.io.compress.bz
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
/**
@@ -59,4 +60,9 @@ public class BZip2DummyCompressor implem
throw new UnsupportedOperationException();
}
+ @Override
+ public void reinit(Configuration conf) {
+ // do nothing
+ }
+
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
Fri Mar 4 03:50:52 2011
@@ -21,7 +21,9 @@ package org.apache.hadoop.io.compress.zl
import java.io.IOException;
import java.util.zip.Deflater;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
+import org.mortbay.log.Log;
/**
* A wrapper around java.util.zip.Deflater to make it conform
@@ -46,4 +48,30 @@ public class BuiltInZlibDeflater extends
throws IOException {
return super.deflate(b, off, len);
}
+
+ /**
+ * reinit the compressor with the given configuration. It will reset the
+ * compressor's compression level and compression strategy. Different from
+ * <tt>ZlibCompressor</tt>, <tt>BuiltInZlibDeflater</tt> only support three
+ * kind of compression strategy: FILTERED, HUFFMAN_ONLY and DEFAULT_STRATEGY.
+ * It will use DEFAULT_STRATEGY as default if the configured compression
+ * strategy is not supported.
+ */
+ @Override
+ public void reinit(Configuration conf) {
+ reset();
+ if (conf == null) {
+ return;
+ }
+ setLevel(ZlibFactory.getCompressionLevel(conf).compressionLevel());
+ final ZlibCompressor.CompressionStrategy strategy =
+ ZlibFactory.getCompressionStrategy(conf);
+ try {
+ setStrategy(strategy.compressionStrategy());
+ } catch (IllegalArgumentException ill) {
+ Log.warn(strategy + " not supported by BuiltInZlibDeflater.");
+ setStrategy(DEFAULT_STRATEGY);
+ }
+ Log.debug("Reinit compressor with new compression configuration");
+ }
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Fri Mar 4 03:50:52 2011
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.NativeCodeLoader;
+import org.mortbay.log.Log;
/**
* A {@link Compressor} based on the popular
@@ -40,7 +42,7 @@ public class ZlibCompressor implements C
private long stream;
private CompressionLevel level;
private CompressionStrategy strategy;
- private CompressionHeader windowBits;
+ private final CompressionHeader windowBits;
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
@@ -178,6 +180,31 @@ public class ZlibCompressor implements C
return nativeZlibLoaded;
}
+ protected final void construct(CompressionLevel level, CompressionStrategy
strategy,
+ CompressionHeader header, int directBufferSize) {
+ }
+
+ /**
+ * Creates a new compressor with the default compression level.
+ * Compressed data will be generated in ZLIB format.
+ */
+ public ZlibCompressor() {
+ this(CompressionLevel.DEFAULT_COMPRESSION,
+ CompressionStrategy.DEFAULT_STRATEGY,
+ CompressionHeader.DEFAULT_HEADER,
+ DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new compressor, taking settings from the configuration.
+ */
+ public ZlibCompressor(Configuration conf) {
+ this(ZlibFactory.getCompressionLevel(conf),
+ ZlibFactory.getCompressionStrategy(conf),
+ CompressionHeader.DEFAULT_HEADER,
+ DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
/**
* Creates a new compressor using the specified compression level.
* Compressed data will be generated in ZLIB format.
@@ -192,28 +219,38 @@ public class ZlibCompressor implements C
this.level = level;
this.strategy = strategy;
this.windowBits = header;
+ stream = init(this.level.compressionLevel(),
+ this.strategy.compressionStrategy(),
+ this.windowBits.windowBits());
+
this.directBufferSize = directBufferSize;
-
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf.position(directBufferSize);
-
- stream = init(this.level.compressionLevel(),
- this.strategy.compressionStrategy(),
- this.windowBits.windowBits());
}
-
+
/**
- * Creates a new compressor with the default compression level.
- * Compressed data will be generated in ZLIB format.
+ * Prepare the compressor to be used in a new stream with settings defined in
+ * the given Configuration. It will reset the compressor's compression level
+ * and compression strategy.
+ *
+ * @param conf Configuration storing new settings
*/
- public ZlibCompressor() {
- this(CompressionLevel.DEFAULT_COMPRESSION,
- CompressionStrategy.DEFAULT_STRATEGY,
- CompressionHeader.DEFAULT_HEADER,
- DEFAULT_DIRECT_BUFFER_SIZE);
+ @Override
+ public synchronized void reinit(Configuration conf) {
+ reset();
+ if (conf == null) {
+ return;
+ }
+ end(stream);
+ level = ZlibFactory.getCompressionLevel(conf);
+ strategy = ZlibFactory.getCompressionStrategy(conf);
+ stream = init(level.compressionLevel(),
+ strategy.compressionStrategy(),
+ windowBits.windowBits());
+ Log.debug("Reinit compressor with new compression configuration");
}
-
+
public synchronized void setInput(byte[] b, int off, int len) {
if (b== null) {
throw new NullPointerException();
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
Fri Mar 4 03:50:52 2011
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.util.NativeCodeLoader;
/**
@@ -106,5 +108,25 @@ public class ZlibFactory {
return (isNativeZlibLoaded(conf)) ?
new ZlibDecompressor() : new BuiltInZlibInflater();
}
-
+
+ public static void setCompressionStrategy(Configuration conf,
+ CompressionStrategy strategy) {
+ conf.setEnum("zlib.compress.strategy", strategy);
+ }
+
+ public static CompressionStrategy getCompressionStrategy(Configuration conf)
{
+ return conf.getEnum("zlib.compress.strategy",
+ CompressionStrategy.DEFAULT_STRATEGY);
+ }
+
+ public static void setCompressionLevel(Configuration conf,
+ CompressionLevel level) {
+ conf.setEnum("zlib.compress.level", level);
+ }
+
+ public static CompressionLevel getCompressionLevel(Configuration conf) {
+ return conf.getEnum("zlib.compress.level",
+ CompressionLevel.DEFAULT_COMPRESSION);
+ }
+
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
Fri Mar 4 03:50:52 2011
@@ -19,9 +19,11 @@ package org.apache.hadoop.io.compress;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Random;
import junit.framework.TestCase;
@@ -41,6 +43,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
public class TestCodec extends TestCase {
@@ -67,6 +71,14 @@ public class TestCodec extends TestCase
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
}
+ public void testGzipCodecWithParam() throws IOException {
+ Configuration conf = new Configuration(this.conf);
+ ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+ ZlibFactory.setCompressionStrategy(conf, CompressionStrategy.HUFFMAN_ONLY);
+ codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
+ }
+
private static void codecTest(Configuration conf, int seed, int count,
String codecClass)
throws IOException {
@@ -149,6 +161,53 @@ public class TestCodec extends TestCase
assertTrue("Got mismatched ZlibCompressor", c2 !=
CodecPool.getCompressor(gzc));
}
+ private static void gzipReinitTest(Configuration conf, CompressionCodec
codec)
+ throws IOException {
+ // Add codec to cache
+ ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+ ZlibFactory.setCompressionStrategy(conf,
+ CompressionStrategy.DEFAULT_STRATEGY);
+ Compressor c1 = CodecPool.getCompressor(codec);
+ CodecPool.returnCompressor(c1);
+ // reset compressor's compression level to perform no compression
+ ZlibFactory.setCompressionLevel(conf, CompressionLevel.NO_COMPRESSION);
+ Compressor c2 = CodecPool.getCompressor(codec, conf);
+ // ensure same compressor placed earlier
+ assertTrue("Got mismatched ZlibCompressor", c1 == c2);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ CompressionOutputStream cos = null;
+ // write trivially compressable data
+ byte[] b = new byte[1 << 15];
+ Arrays.fill(b, (byte) 43);
+ try {
+ cos = codec.createOutputStream(bos, c2);
+ cos.write(b);
+ } finally {
+ if (cos != null) {
+ cos.close();
+ }
+ CodecPool.returnCompressor(c2);
+ }
+ byte[] outbytes = bos.toByteArray();
+ // verify data were not compressed
+ assertTrue("Compressed bytes contrary to configuration",
+ outbytes.length >= b.length);
+ }
+
+ public void testCodecPoolCompressorReinit() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean("hadoop.native.lib", true);
+ if (ZlibFactory.isNativeZlibLoaded(conf)) {
+ GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
+ gzipReinitTest(conf, gzc);
+ } else {
+ LOG.warn("testCodecPoolCompressorReinit skipped: native libs not
loaded");
+ }
+ conf.setBoolean("hadoop.native.lib", false);
+ DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
+ gzipReinitTest(conf, dfc);
+ }
+
public void testSequenceFileDefaultCodec() throws IOException,
ClassNotFoundException,
InstantiationException, IllegalAccessException {
sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.DefaultCodec", 100);