Author: cutting
Date: Wed Apr 6 17:59:26 2011
New Revision: 1089550
URL: http://svn.apache.org/viewvc?rev=1089550&view=rev
Log:
AVRO-788. Java: Add Snappy compression.
Added:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/doc/src/content/xdocs/spec.xml
avro/trunk/lang/java/avro/pom.xml
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
avro/trunk/lang/java/pom.xml
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Apr 6 17:59:26 2011
@@ -17,6 +17,9 @@ Avro 1.5.1 (unreleased)
AVRO-533. Add a C# implementation.
(Jeremy Custenborder, Dona Alvarez and thiru)
+ AVRO-788. Java: Add Snappy compression for data files, including
+ MapReduce API support. (cutting)
+
IMPROVEMENTS
AVRO-785. Java: Squash a Velocity warning by upgrading to Velocity 1.7.
Modified: avro/trunk/doc/src/content/xdocs/spec.xml
URL:
http://svn.apache.org/viewvc/avro/trunk/doc/src/content/xdocs/spec.xml?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
--- avro/trunk/doc/src/content/xdocs/spec.xml (original)
+++ avro/trunk/doc/src/content/xdocs/spec.xml Wed Apr 6 17:59:26 2011
@@ -695,7 +695,15 @@
</p>
</section>
</section>
-
+ <section>
+ <title>Optional Codecs</title>
+ <section>
+ <title>snappy</title>
+ <p>The "snappy" codec uses
+ Google's <a href="http://code.google.com/p/snappy/">Snappy</a>
+ compression library.</p>
+ </section>
+ </section>
</section>
<section>
Modified: avro/trunk/lang/java/avro/pom.xml
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/pom.xml?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/pom.xml (original)
+++ avro/trunk/lang/java/avro/pom.xml Wed Apr 6 17:59:26 2011
@@ -39,6 +39,10 @@
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
</dependencies>
<build>
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
Wed Apr 6 17:59:26 2011
@@ -36,6 +36,11 @@ public abstract class CodecFactory {
return new DeflateCodec.Option(compressionLevel);
};
+ /** Snappy codec.*/
+ public static CodecFactory snappyCodec() {
+ return new SnappyCodec.Option();
+ };
+
/** Creates internal Codec. */
protected abstract Codec createInstance();
@@ -50,6 +55,7 @@ public abstract class CodecFactory {
static {
addCodec("null", nullCodec());
addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
+ addCodec("snappy", snappyCodec());
}
/** Maps a codec name into a CodecOption. */
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java
Wed Apr 6 17:59:26 2011
@@ -36,5 +36,6 @@ public class DataFileConstants {
public static final String CODEC = "avro.codec";
public static final String NULL_CODEC = "null";
public static final String DEFLATE_CODEC = "deflate";
+ public static final String SNAPPY_CODEC = "snappy";
}
Added:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java?rev=1089550&view=auto
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
(added)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
Wed Apr 6 17:59:26 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyException;
+
+/** * Implements Snappy compression and decompression. */
+class SnappyCodec extends Codec {
+
+ private static final SnappyCodec INSTANCE = new SnappyCodec();
+
+ static class Option extends CodecFactory {
+ @Override
+ protected Codec createInstance() {
+ return INSTANCE;
+ }
+ }
+
+ private SnappyCodec() {}
+
+ @Override String getName() { return DataFileConstants.SNAPPY_CODEC; }
+
+ @Override
+ ByteBuffer compress(ByteBuffer in) throws IOException {
+ try {
+ ByteBuffer out =
+ ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining()));
+ int size = Snappy.compress(in.array(), in.position(), in.remaining(),
+ out.array(), 0);
+ out.limit(size);
+ return out;
+ } catch (SnappyException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ ByteBuffer decompress(ByteBuffer in) throws IOException {
+ try {
+ ByteBuffer out = ByteBuffer.allocate
+ (Snappy.uncompressedLength(in.array(), in.position(), in.remaining()));
+ int size = Snappy.uncompress(in.array(), in.position(), in.remaining(),
+ out.array(), 0);
+ out.limit(size);
+ return out;
+ } catch (SnappyException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override public int hashCode() { return getName().hashCode(); }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (getClass() != obj.getClass())
+ return false;
+ return true;
+ }
+
+}
Modified:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
(original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
Wed Apr 6 17:59:26 2011
@@ -61,6 +61,7 @@ public class TestDataFile {
r.add(new Object[] { CodecFactory.deflateCodec(1) });
r.add(new Object[] { CodecFactory.deflateCodec(9) });
r.add(new Object[] { CodecFactory.nullCodec() });
+ r.add(new Object[] { CodecFactory.snappyCodec() });
return r;
}
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
Wed Apr 6 17:59:26 2011
@@ -42,6 +42,8 @@ public class AvroJob {
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
/** The configuration key for a job's output schema. */
public static final String OUTPUT_SCHEMA = "avro.output.schema";
+ /** The configuration key for a job's output compression codec. */
+ public static final String OUTPUT_CODEC = "avro.output.codec";
/** The configuration key prefix for a text output metadata. */
public static final String TEXT_PREFIX = "avro.meta.text.";
/** The configuration key prefix for a binary output metadata. */
@@ -82,6 +84,11 @@ public class AvroJob {
configureAvroOutput(job);
}
+ /** Configure a job's output compression codec. */
+ public static void setOutputCodec(JobConf job, String codec) {
+ job.set(OUTPUT_CODEC, codec);
+ }
+
/** Add metadata to job output files.*/
public static void setOutputMeta(JobConf job, String key, String value) {
job.set(TEXT_PREFIX+key, value);
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
Wed Apr 6 17:59:26 2011
@@ -36,6 +36,7 @@ import org.apache.avro.reflect.ReflectDa
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.CodecFactory;
import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
public class AvroOutputFormat <T>
@@ -80,7 +81,11 @@ public class AvroOutputFormat <T>
if (FileOutputFormat.getCompressOutput(job)) {
int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
- writer.setCodec(CodecFactory.deflateCodec(level));
+ String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
+ CodecFactory factory = codecName.equals(DEFLATE_CODEC)
+ ? CodecFactory.deflateCodec(level)
+ : CodecFactory.fromString(codecName);
+ writer.setCodec(factory);
}
writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY,
DEFAULT_SYNC_INTERVAL));
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
Wed Apr 6 17:59:26 2011
@@ -33,6 +33,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.file.DataFileReader;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -123,6 +124,7 @@ public class TestWeather {
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
+ AvroJob.setOutputCodec(job, SNAPPY_CODEC);
JobClient.runJob(job);
Modified: avro/trunk/lang/java/pom.xml
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1089550&r1=1089549&r2=1089550&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Wed Apr 6 17:59:26 2011
@@ -45,6 +45,7 @@
<jetty-version>6.1.26</jetty-version>
<netty-version>3.2.4.Final</netty-version>
<jopt-simple-version>3.2</jopt-simple-version>
+ <snappy-version>1.0.1-rc3</snappy-version>
</properties>
<issueManagement>
@@ -339,6 +340,13 @@
<version>${hadoop-version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy-version}</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</dependencyManagement>