Updated Branches: refs/heads/master 7971c86d6 -> c12ef8109
Fix to compress Avro output Signed-off-by: jwills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/c12ef810 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/c12ef810 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/c12ef810 Branch: refs/heads/master Commit: c12ef81090bbc142ed865592bb1a497699376c24 Parents: 7971c86 Author: Joseph Adler <[email protected]> Authored: Tue Aug 7 15:52:03 2012 -0700 Committer: jwills <[email protected]> Committed: Tue Aug 7 21:19:58 2012 -0700 ---------------------------------------------------------------------- .../apache/crunch/types/avro/AvroOutputFormat.java | 21 ++++++++++++++- 1 files changed, 20 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/c12ef810/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java index 9a5a073..2582cc2 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java @@ -20,12 +20,14 @@ package org.apache.crunch.types.avro; import java.io.IOException; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroWrapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -49,9 +51,26 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr ReflectDataFactory factory = Avros.getReflectDataFactory(conf); final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter()); + JobConf jc = new JobConf(conf); + /* copied from org.apache.avro.mapred.AvroOutputFormat */ + + if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) { + int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, + org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL); + String codecName = conf.get(AvroJob.OUTPUT_CODEC, + org.apache.avro.file.DataFileConstants.DEFLATE_CODEC); + CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC) + ? CodecFactory.deflateCodec(level) + : CodecFactory.fromString(codecName); + WRITER.setCodec(codec); + } + + WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY, + org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL)); + Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT); WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path)); - + return new RecordWriter<AvroWrapper<T>, NullWritable>() { @Override public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException {
