Author: tomwhite
Date: Mon Jun 20 16:32:27 2011
New Revision: 1137690
URL: http://svn.apache.org/viewvc?rev=1137690&view=rev
Log:
HADOOP-7206. Integrate Snappy compression. Contributed by T Jake Luciani.
Added:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java
(with props)
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
(with props)
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
(with props)
Modified:
hadoop/common/trunk/common/CHANGES.txt
hadoop/common/trunk/common/ivy.xml
hadoop/common/trunk/common/ivy/hadoop-common-template.xml
hadoop/common/trunk/common/ivy/libraries.properties
hadoop/common/trunk/common/src/java/core-default.xml
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
Modified: hadoop/common/trunk/common/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/CHANGES.txt (original)
+++ hadoop/common/trunk/common/CHANGES.txt Mon Jun 20 16:32:27 2011
@@ -47,6 +47,8 @@ Trunk (unreleased changes)
HADOOP-7379. Add the ability to serialize and deserialize protocol buffers
in ObjectWritable. (todd)
+ HADOOP-7206. Integrate Snappy compression. (T Jake Luciani via tomwhite)
+
IMPROVEMENTS
HADOOP-7042. Updates to test-patch.sh to include failed test names and
Modified: hadoop/common/trunk/common/ivy.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/ivy.xml?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/ivy.xml (original)
+++ hadoop/common/trunk/common/ivy.xml Mon Jun 20 16:32:27 2011
@@ -327,5 +327,9 @@
name="protobuf-java"
rev="${protobuf.version}"
conf="common->default"/>
+ <dependency org="org.xerial.snappy"
+ name="snappy-java"
+ rev="${snappy-java.version}"
+ conf="common->default"/>
</dependencies>
</ivy-module>
Modified: hadoop/common/trunk/common/ivy/hadoop-common-template.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/ivy/hadoop-common-template.xml?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/ivy/hadoop-common-template.xml (original)
+++ hadoop/common/trunk/common/ivy/hadoop-common-template.xml Mon Jun 20
16:32:27 2011
@@ -155,5 +155,10 @@
<artifactId>protobuf-java</artifactId>
<version>2.4.0a</version>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>java-snappy</artifactId>
+ <version>1.0.3-rc2</version>
+ </dependency>
</dependencies>
</project>
Modified: hadoop/common/trunk/common/ivy/libraries.properties
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/ivy/libraries.properties?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/ivy/libraries.properties (original)
+++ hadoop/common/trunk/common/ivy/libraries.properties Mon Jun 20 16:32:27 2011
@@ -74,6 +74,7 @@ rats-lib.version=0.6
servlet.version=4.0.6
servlet-api-2.5.version=6.1.14
servlet-api.version=2.5
+snappy-java.version=1.0.3-rc2
slf4j-api.version=1.5.11
slf4j-log4j12.version=1.5.11
Modified: hadoop/common/trunk/common/src/java/core-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/core-default.xml?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/java/core-default.xml (original)
+++ hadoop/common/trunk/common/src/java/core-default.xml Mon Jun 20 16:32:27
2011
@@ -174,7 +174,7 @@
<property>
<name>io.compression.codecs</name>
-
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec</value>
+
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.DeflateCodec</value>
<description>A list of the compression codec classes that can be used
for compression/decompression.</description>
</property>
Added:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java?rev=1137690&view=auto
==============================================================================
---
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java
(added)
+++
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java
Mon Jun 20 16:32:27 2011
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyError;
+
+public class SnappyCodec implements Configurable, CompressionCodec {
+ private static final Log logger = LogFactory.getLog(SnappyCodec.class
+ .getName());
+ private static boolean nativeSnappyLoaded = false;
+ private Configuration conf;
+
+ public static final String SNAPPY_BUFFER_SIZE_KEY =
"io.compression.codec.snappy.buffersize";
+ public static final int DEFAULT_SNAPPY_BUFFER_SIZE = 256 * 1024;
+
+ public SnappyCodec() {
+
+ }
+
+ public SnappyCodec(Configuration conf) {
+ setConf(conf);
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ static {
+ try {
+ if (Snappy.getNativeLibraryVersion() != null) {
+ logger
+ .info("Successfully loaded & initialized native-snappy library
[snappy-java rev "
+ + Snappy.getNativeLibraryVersion() + "]");
+
+ nativeSnappyLoaded = true;
+ } else {
+ logger.info("Failed to load native-snappy library");
+ }
+
+ } catch (SnappyError e) {
+ logger.error("Native Snappy load error: ", e);
+ }
+ }
+
+ public static boolean isNativeSnappyLoaded(Configuration conf) {
+ return nativeSnappyLoaded;
+ }
+
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return createOutputStream(out, createCompressor());
+ }
+
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+
+ if (!isNativeSnappyLoaded(conf)) {
+ throw new RuntimeException("native-snappy library not available");
+ }
+
+ int bufferSize = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+ DEFAULT_SNAPPY_BUFFER_SIZE);
+
+ int compressionOverhead = Snappy.maxCompressedLength(bufferSize) -
bufferSize;
+
+ return new BlockCompressorStream(out, compressor, bufferSize,
+ compressionOverhead);
+ }
+
+ public Class<? extends Compressor> getCompressorType() {
+ if (!isNativeSnappyLoaded(conf)) {
+ throw new RuntimeException("native-snappy library not available");
+ }
+ return SnappyCompressor.class;
+ }
+
+ public Compressor createCompressor() {
+ if (!isNativeSnappyLoaded(conf)) {
+ throw new RuntimeException("native-snappy library not available");
+ }
+
+ return new SnappyCompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+ DEFAULT_SNAPPY_BUFFER_SIZE));
+ }
+
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return createInputStream(in, createDecompressor());
+ }
+
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor) throws IOException {
+ if (!isNativeSnappyLoaded(conf)) {
+ throw new RuntimeException("native-snappy library not available");
+ }
+ return new BlockDecompressorStream(in, decompressor, conf.getInt(
+ SNAPPY_BUFFER_SIZE_KEY, DEFAULT_SNAPPY_BUFFER_SIZE));
+ }
+
+ public Class<? extends Decompressor> getDecompressorType() {
+ if (!isNativeSnappyLoaded(conf)) {
+ throw new RuntimeException("native-snappy library not available");
+ }
+ return SnappyDecompressor.class;
+ }
+
+ public Decompressor createDecompressor() {
+ if (!isNativeSnappyLoaded(conf)) {
+ throw new RuntimeException("native-snappy library not available");
+ }
+
+ return new SnappyDecompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+ DEFAULT_SNAPPY_BUFFER_SIZE));
+ }
+
+ public String getDefaultExtension() {
+ return ".snappy";
+ }
+}
Propchange:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java?rev=1137690&view=auto
==============================================================================
---
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
(added)
+++
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
Mon Jun 20 16:32:27 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyException;
+
+public class SnappyCompressor implements Compressor {
+ private static final Log logger = LogFactory.getLog(SnappyCompressor.class
+ .getName());
+
+ private boolean finish, finished;
+ private ByteBuffer outBuf;
+ private ByteBuffer compressedBuf;
+
+ private long bytesRead = 0L;
+ private long bytesWritten = 0L;
+
+ public SnappyCompressor(int bufferSize) {
+ outBuf = ByteBuffer.allocateDirect(bufferSize);
+ compressedBuf = ByteBuffer.allocateDirect(Snappy
+ .maxCompressedLength(bufferSize));
+
+ reset();
+ }
+
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ finished = false;
+
+ outBuf.put(b, off, len);
+
+ bytesRead += len;
+ }
+
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ public synchronized boolean needsInput() {
+ // needs input if compressed data was consumed
+ if (compressedBuf.position() > 0
+ && compressedBuf.limit() > compressedBuf.position())
+ return false;
+
+ return true;
+ }
+
+ public synchronized void finish() {
+ finish = true;
+ }
+
+ public synchronized boolean finished() {
+ // Check if all compressed data has been consumed
+ return (finish && finished);
+ }
+
+ public synchronized int compress(byte[] b, int off, int len)
+ throws IOException {
+
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ if (finished || outBuf.position() == 0) {
+ finished = true;
+ return 0;
+ }
+
+ // Only need todo this once
+ if (compressedBuf.position() == 0) {
+ try {
+ outBuf.limit(outBuf.position());
+ outBuf.rewind();
+
+ int lim = Snappy.compress(outBuf, compressedBuf);
+
+ compressedBuf.limit(lim);
+ compressedBuf.rewind();
+ } catch (SnappyException e) {
+ throw new IOException(e);
+ }
+ }
+
+ int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len
+ : (compressedBuf.limit() - compressedBuf.position());
+
+ if (n == 0) {
+ finished = true;
+ return 0;
+ }
+
+ compressedBuf.get(b, off, n);
+
+ bytesWritten += n;
+
+ // Set 'finished' if snappy has consumed all user-data
+ if (compressedBuf.position() == compressedBuf.limit()) {
+ finished = true;
+
+ outBuf.limit(outBuf.capacity());
+ outBuf.rewind();
+
+ compressedBuf.limit(compressedBuf.capacity());
+ compressedBuf.rewind();
+
+ }
+
+ return n;
+ }
+
+ public synchronized void reset() {
+ finish = false;
+ finished = false;
+
+ outBuf.limit(outBuf.capacity());
+ outBuf.rewind();
+
+ compressedBuf.limit(compressedBuf.capacity());
+ compressedBuf.rewind();
+
+ bytesRead = bytesWritten = 0L;
+ }
+
+ public synchronized void reinit(Configuration conf) {
+ reset();
+ }
+
+ /**
+ * Return number of bytes given to this compressor since last reset.
+ */
+ public synchronized long getBytesRead() {
+ return bytesRead;
+ }
+
+ /**
+ * Return number of bytes consumed by callers of compress since last reset.
+ */
+ public synchronized long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ public synchronized void end() {
+ }
+
+}
Propchange:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java?rev=1137690&view=auto
==============================================================================
---
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
(added)
+++
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
Mon Jun 20 16:32:27 2011
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress.snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyException;
+
+public class SnappyDecompressor implements Decompressor {
+
+ private static final Log logger = LogFactory.getLog(SnappyDecompressor.class
+ .getName());
+
+ private boolean finished;
+ private ByteBuffer outBuf;
+ private ByteBuffer uncompressedBuf;
+
+ private long bytesRead = 0L;
+ private long bytesWritten = 0L;
+
+ public SnappyDecompressor(int bufferSize) {
+ outBuf = ByteBuffer.allocateDirect(bufferSize);
+ uncompressedBuf = ByteBuffer.allocateDirect(bufferSize);
+
+ reset();
+ }
+
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ finished = false;
+
+ outBuf.put(b, off, len);
+
+ bytesRead += len;
+ }
+
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ public synchronized boolean needsInput() {
+ // needs input if the uncompressed data was consumed
+ if (uncompressedBuf.position() > 0
+ && uncompressedBuf.limit() > uncompressedBuf.position())
+ return false;
+
+ return true;
+ }
+
+ public synchronized boolean needsDictionary() {
+ return false;
+ }
+
+ public synchronized boolean finished() {
+ return finished;
+ }
+
+ public synchronized int decompress(byte[] b, int off, int len)
+ throws IOException {
+
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ // nothing to decompress
+ if ((outBuf.position() == 0 && uncompressedBuf.position() == 0) ||
finished) {
+ reset();
+ finished = true;
+
+ return 0;
+ }
+
+ // only needs to do this once per input
+ if (uncompressedBuf.position() == 0) {
+ try {
+ outBuf.limit(outBuf.position());
+ outBuf.rewind();
+
+ int neededLen = Snappy.uncompressedLength(outBuf);
+ outBuf.rewind();
+
+ if (neededLen > uncompressedBuf.capacity())
+ uncompressedBuf = ByteBuffer.allocateDirect(neededLen);
+
+ int lim = Snappy.uncompress(outBuf, uncompressedBuf);
+
+ uncompressedBuf.limit(lim);
+ uncompressedBuf.rewind();
+ } catch (SnappyException e) {
+ throw new IOException(e);
+ }
+ }
+
+ int n = (uncompressedBuf.limit() - uncompressedBuf.position()) > len ? len
+ : (uncompressedBuf.limit() - uncompressedBuf.position());
+
+ if (n == 0) {
+ reset();
+ finished = true;
+ return 0;
+ }
+
+ uncompressedBuf.get(b, off, n);
+
+ bytesWritten += n;
+
+ // Set 'finished' if snappy has consumed all user-data
+ if (uncompressedBuf.position() == uncompressedBuf.limit()) {
+ reset();
+ finished = true;
+ }
+
+ return n;
+ }
+
+ public synchronized int getRemaining() {
+ // Never use this function in BlockDecompressorStream.
+ return 0;
+ }
+
+ public synchronized void reset() {
+ finished = false;
+
+ uncompressedBuf.limit(uncompressedBuf.capacity());
+ uncompressedBuf.rewind();
+
+ outBuf.limit(outBuf.capacity());
+ outBuf.rewind();
+
+ bytesRead = bytesWritten = 0L;
+ }
+
+ public synchronized void end() {
+ // do nothing
+ }
+
+ protected void finalize() {
+ end();
+ }
+
+}
Propchange:
hadoop/common/trunk/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java?rev=1137690&r1=1137689&r2=1137690&view=diff
==============================================================================
---
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++
hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
Mon Jun 20 16:32:27 2011
@@ -102,6 +102,12 @@ public class TestCodec {
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DeflateCodec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec");
}
+
+ @Test
+ public void testSnappyCodec() throws IOException {
+ codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
+ }
@Test
public void testGzipCodecWithParam() throws IOException {