Author: cutting
Date: Fri Jun 6 21:13:27 2014
New Revision: 1601014
URL: http://svn.apache.org/r1601014
Log:
AVRO-1522. Java: Add support for compression codecs to SortedKeyValueFile.
Contributed by Steven Willis.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1601014&r1=1601013&r2=1601014&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jun 6 21:13:27 2014
@@ -15,6 +15,9 @@ Trunk (not yet released)
including a subtype of bytes and fixed for decimal values.
(tomwhite & Ryan Blue via cutting)
+ AVRO-1522. Java: Add support for compression codecs to SortedKeyValueFile.
+ (Steven Willis via cutting)
+
OPTIMIZATIONS
AVRO-1455. Deep copy does not need to create new instances for primitives.
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java?rev=1601014&r1=1601013&r2=1601014&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
Fri Jun 6 21:13:27 2014
@@ -32,6 +32,7 @@ import org.apache.avro.generic.GenericRe
import org.apache.avro.specific.SpecificData;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.hadoop.util.AvroCharSequenceComparator;
@@ -389,6 +390,9 @@ public class SortedKeyValueFile {
/** The model for the data. */
private GenericData model = SpecificData.get();
+ /** The compression codec for the data. */
+ private CodecFactory codec = CodecFactory.nullCodec();
+
/**
* Sets the key schema.
*
@@ -502,6 +506,23 @@ public class SortedKeyValueFile {
public GenericData getDataModel() {
return model;
}
+
+ /** Set the compression codec. */
+ public Options withCodec(String codec) {
+ this.codec = CodecFactory.fromString(codec);
+ return this;
+ }
+
+ /** Set the compression codec. */
+ public Options withCodec(CodecFactory codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ /** Return the compression codec. */
+ public CodecFactory getCodec() {
+ return this.codec;
+ }
}
/**
@@ -549,6 +570,7 @@ public class SortedKeyValueFile {
mDataFileWriter = new DataFileWriter<GenericRecord>(datumWriter)
.setSyncInterval(1 << 20) // Set the auto-sync interval
sufficiently large, since
// we will manually sync every
mIndexInterval records.
+ .setCodec(options.getCodec())
.create(mRecordSchema, dataOutputStream);
// Open a writer for the index file.
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java?rev=1601014&r1=1601013&r2=1601014&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
Fri Jun 6 21:13:27 2014
@@ -25,12 +25,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.mapred.FsInput;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.util.Utf8;
@@ -74,6 +78,83 @@ public class TestSortedKeyValueFile {
}
@Test
+ public void testNamedCodecs() throws IOException {
+ Configuration conf = new Configuration();
+ Path myfile = new Path(mTempDir.getRoot().getPath(), "myfile");
+ Schema key = Schema.create(Schema.Type.STRING);
+ Schema value = Schema.create(Schema.Type.STRING);
+ Schema recordSchema = AvroKeyValue.getSchema(key, value);
+ DatumReader<GenericRecord> datumReader =
SpecificData.get().createDatumReader(recordSchema);
+ DataFileReader<GenericRecord> reader;
+
+ SortedKeyValueFile.Writer.Options options = new
SortedKeyValueFile.Writer.Options()
+ .withKeySchema(key)
+ .withValueSchema(value)
+ .withConfiguration(conf)
+ .withPath(myfile);
+
+ SortedKeyValueFile.Writer<CharSequence, CharSequence> writer;
+
+ for(String codec : new String[]{"null", "deflate", "snappy", "bzip2"}) {
+ LOG.debug("Using " + codec + "codec for a SortedKeyValueFile...");
+
+ options.withCodec(codec);
+
+ writer = new SortedKeyValueFile.Writer<CharSequence,
CharSequence>(options);
+ writer.close();
+
+ reader = new DataFileReader<GenericRecord>(
+ new FsInput(new Path(myfile,SortedKeyValueFile.DATA_FILENAME),
conf),
+ datumReader);
+
+ assertEquals(codec, reader.getMetaString("avro.codec"));
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testDeflateClassCodec() throws IOException {
+ Configuration conf = new Configuration();
+ Path myfile = new Path(mTempDir.getRoot().getPath(), "myfile");
+ Schema key = Schema.create(Schema.Type.STRING);
+ Schema value = Schema.create(Schema.Type.STRING);
+ Schema recordSchema = AvroKeyValue.getSchema(key, value);
+ DatumReader<GenericRecord> datumReader =
SpecificData.get().createDatumReader(recordSchema);
+ DataFileReader<GenericRecord> reader;
+
+ LOG.debug("Using CodecFactory.deflateCodec() for a SortedKeyValueFile...");
+ SortedKeyValueFile.Writer.Options options = new
SortedKeyValueFile.Writer.Options()
+ .withKeySchema(key)
+ .withValueSchema(value)
+ .withConfiguration(conf)
+ .withPath(myfile)
+ .withCodec(CodecFactory.deflateCodec(9));
+
+ SortedKeyValueFile.Writer<CharSequence, CharSequence> writer =
+ new SortedKeyValueFile.Writer<CharSequence, CharSequence>(options);
+ writer.close();
+
+ reader = new DataFileReader<GenericRecord>(
+ new FsInput(new Path(myfile,SortedKeyValueFile.DATA_FILENAME), conf),
+ datumReader);
+
+ assertEquals("deflate", reader.getMetaString("avro.codec"));
+ reader.close();
+ }
+
+ @Test
+ public void testBadCodec() throws IOException {
+ LOG.debug("Using a bad codec for a SortedKeyValueFile...");
+
+ try {
+ SortedKeyValueFile.Writer.Options options =
+ new SortedKeyValueFile.Writer.Options().withCodec("foobar");
+ } catch (AvroRuntimeException e) {
+ assertEquals("Unrecognized codec: foobar", e.getMessage());
+ }
+ }
+
+ @Test
public void testWriter() throws IOException {
LOG.debug("Writing some records to a SortedKeyValueFile...");