Updated Branches: refs/heads/flume-1.4 523b6b446 -> 5efc2d62f
FLUME-1709: HDFS CompressedDataStream doesn't support serializer parameter (Cameron Gandevia via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5efc2d62 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5efc2d62 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5efc2d62 Branch: refs/heads/flume-1.4 Commit: 5efc2d62f49a9aea9a006b74bbb27cdea56219af Parents: 523b6b4 Author: Brock Noland <[email protected]> Authored: Fri Nov 30 13:26:31 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Nov 30 13:26:46 2012 -0600 ---------------------------------------------------------------------- .../flume/sink/hdfs/HDFSCompressedDataStream.java | 38 +++++- .../sink/hdfs/TestHDFSCompressedDataStream.java | 101 ++++++++++++-- 2 files changed, 120 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5efc2d62/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index 80341ef..afcd9d6 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -21,6 +21,8 @@ package org.apache.flume.sink.hdfs; import java.io.IOException; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.serialization.EventSerializer; +import org.apache.flume.serialization.EventSerializerFactory; import org.apache.flume.sink.FlumeFormatter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -42,9 +44,15 @@ public class HDFSCompressedDataStream implements HDFSWriter { private CompressionOutputStream cmpOut; private boolean isFinished = false; + private String serializerType; + private Context serializerContext; + private EventSerializer serializer; + @Override public void configure(Context context) { - // no-op + serializerType = context.getString("serializer", "TEXT"); + serializerContext = new Context( + context.getSubProperties(EventSerializer.CTX_PREFIX)); } @Override @@ -61,13 +69,28 @@ public class HDFSCompressedDataStream implements HDFSWriter { Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); + boolean appending = false; if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile (dstPath)) { fsOut = hdfs.append(dstPath); + appending = true; } else { fsOut = hdfs.create(dstPath); } cmpOut = codec.createOutputStream(fsOut); + serializer = EventSerializerFactory.getInstance(serializerType, + serializerContext, cmpOut); + if (appending && !serializer.supportsReopen()) { + cmpOut.close(); + serializer = null; + throw new IOException("serializer (" + serializerType + + ") does not support append"); + } + if (appending) { + serializer.afterReopen(); + } else { + serializer.afterCreate(); + } isFinished = false; } @@ -77,8 +100,7 @@ public class HDFSCompressedDataStream implements HDFSWriter { cmpOut.resetState(); isFinished = false; } - byte[] bValue = fmt.getBytes(e); - cmpOut.write(bValue); + serializer.write(e); } @Override @@ -88,6 +110,7 @@ public class HDFSCompressedDataStream implements HDFSWriter { // Also, since resetState() writes headers, avoid calling it without an // additional write/append operation. // Note: There are bugs in Hadoop & JDK w/ pure-java gzip; see HADOOP-8522. + serializer.flush(); if (!isFinished) { cmpOut.finish(); isFinished = true; @@ -98,7 +121,14 @@ public class HDFSCompressedDataStream implements HDFSWriter { @Override public void close() throws IOException { - sync(); + serializer.flush(); + serializer.beforeClose(); + if (!isFinished) { + cmpOut.finish(); + isFinished = true; + } + fsOut.flush(); + fsOut.sync(); cmpOut.close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/5efc2d62/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java index f537732..cfa946a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java @@ -18,51 +18,74 @@ package org.apache.flume.sink.hdfs; -import com.google.common.base.Charsets; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.CharsetDecoder; +import java.util.Arrays; +import java.util.List; import java.util.zip.GZIPInputStream; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.apache.flume.sink.FlumeFormatter; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; + public class TestHDFSCompressedDataStream { private static final Logger logger = LoggerFactory.getLogger(TestHDFSCompressedDataStream.class); - // make sure the data makes it to disk if we sync() the data stream - @Test - public void testGzipDurability() throws IOException { - File file = new File("target/test/data/foo.gz"); - String fileURI = file.getAbsoluteFile().toURI().toString(); + private File file; + private String fileURI; + private CompressionCodecFactory factory; + private FlumeFormatter fmt; + + @Before + public void init() throws Exception { + this.file = new File("target/test/data/foo.gz"); + this.fileURI = file.getAbsoluteFile().toURI().toString(); logger.info("File URI: {}", fileURI); Configuration conf = new Configuration(); // local FS must be raw in order to be Syncable conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); Path path = new Path(fileURI); - FileSystem fs = path.getFileSystem(conf); // get FS with our conf cached - CompressionCodecFactory factory = new CompressionCodecFactory(conf); + path.getFileSystem(conf); // get FS with our conf cached + this.factory = new CompressionCodecFactory(conf); + this.fmt = new HDFSTextFormatter(); + } + + // make sure the data makes it to disk if we sync() the data stream + @Test + public void testGzipDurability() throws Exception { + Context context = new Context(); HDFSCompressedDataStream writer = new HDFSCompressedDataStream(); - FlumeFormatter fmt = new HDFSTextFormatter(); + writer.configure(context); writer.open(fileURI, factory.getCodec(new Path(fileURI)), SequenceFile.CompressionType.BLOCK, fmt); - String body = "yarf!"; - Event evt = EventBuilder.withBody(body, Charsets.UTF_8); - writer.append(evt, fmt); - writer.sync(); + + String[] bodies = { "yarf!" }; + writeBodies(writer, bodies); byte[] buf = new byte[256]; GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file)); @@ -70,7 +93,55 @@ public class TestHDFSCompressedDataStream { String result = new String(buf, 0, len, Charsets.UTF_8); result = result.trim(); // HDFSTextFormatter adds a newline - Assert.assertEquals("input and output must match", body, result); + Assert.assertEquals("input and output must match", bodies[0], result); + } + + @Test + public void testGzipDurabilityWithSerializer() throws Exception { + Context context = new Context(); + context.put("serializer", "AVRO_EVENT"); + + HDFSCompressedDataStream writer = new HDFSCompressedDataStream(); + writer.configure(context); + + FlumeFormatter fmt = new HDFSTextFormatter(); + writer.open(fileURI, factory.getCodec(new Path(fileURI)), + SequenceFile.CompressionType.BLOCK, fmt); + + String[] bodies = { "yarf!", "yarfing!" }; + writeBodies(writer, bodies); + + int found = 0; + int expected = bodies.length; + List<String> expectedBodies = Lists.newArrayList(bodies); + + GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file)); + DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); + DataFileStream<GenericRecord> avroStream = + new DataFileStream<GenericRecord>(cmpIn, reader); + GenericRecord record = new GenericData.Record(avroStream.getSchema()); + while (avroStream.hasNext()) { + avroStream.next(record); + CharsetDecoder decoder = Charsets.UTF_8.newDecoder(); + String bodyStr = decoder.decode((ByteBuffer) record.get("body")) + .toString(); + expectedBodies.remove(bodyStr); + found++; + } + avroStream.close(); + cmpIn.close(); + + Assert.assertTrue("Found = " + found + ", Expected = " + expected + + ", Left = " + expectedBodies.size() + " " + expectedBodies, + expectedBodies.size() == 0); } + private void writeBodies(HDFSCompressedDataStream writer, String... bodies) + throws Exception { + for (String body : bodies) { + Event evt = EventBuilder.withBody(body, Charsets.UTF_8); + writer.append(evt, fmt); + } + writer.sync(); + } }
