Repository: flume Updated Branches: refs/heads/trunk c7de4ba5c -> 9965dae7b
FLUME-2810. Add static Schema URL to AvroEventSerializer configuration Currently the only way to pass a schema to the avro event serializer is via header. This introduces an option to specify the location directly in the Flume configuration. (Jeff Holoman via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dbf2e989 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dbf2e989 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dbf2e989 Branch: refs/heads/trunk Commit: dbf2e989744a6b2921076be017102f75323a69f4 Parents: c7de4ba Author: Jeff Holoman <[email protected]> Authored: Tue Jul 19 12:29:08 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Tue Jul 19 14:43:33 2016 -0700 ---------------------------------------------------------------------- ...roEventSerializerConfigurationConstants.java | 5 ++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 51 ++++++++++++++++++-- .../flume/sink/hdfs/AvroEventSerializer.java | 45 +++++++++++------ .../sink/hdfs/TestAvroEventSerializer.java | 47 ++++++++++++++---- 4 files changed, 117 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java index cce6716..470fcea 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java @@ -35,4 +35,9 @@ public class AvroEventSerializerConfigurationConstants { public static final String COMPRESSION_CODEC = "compressionCodec"; public static final String DEFAULT_COMPRESSION_CODEC = "null"; // no codec + /** + * Avro static Schema URL + */ + public static final String STATIC_SCHEMA_URL = "schemaURL"; + public static final String DEFAULT_STATIC_SCHEMA_URL = null; } http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index f9ca1b2..3937514 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -3235,19 +3235,59 @@ Example for agent named a1: a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false +"Flume Event" Avro Event Serializer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Alias: ``avro_event``. + +This interceptor serializes Flume events into an Avro container file. The schema used is the same schema used for +Flume events in the Avro RPC mechanism. + +This serializer inherits from the ``AbstractAvroEventSerializer`` class. + +Configuration options are as follows: + +========================== ================ =========================================================================== +Property Name Default Description +========================== ================ =========================================================================== +syncIntervalBytes 2048000 Avro sync interval, in approximate bytes. +compressionCodec null Avro compression codec. For supported codecs, see Avro's CodecFactory docs. +========================== ================ =========================================================================== + +Example for agent named a1: + +.. code-block:: properties + + a1.sinks.k1.type = hdfs + a1.sinks.k1.channel = c1 + a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S + a1.sinks.k1.serializer = avro_event + a1.sinks.k1.serializer.compressionCodec = snappy + Avro Event Serializer ~~~~~~~~~~~~~~~~~~~~~ -Alias: ``avro_event``. This interceptor serializes Flume events into an Avro -container file. The schema used is the same schema used for Flume events -in the Avro RPC mechanism. This serializers inherits from the -``AbstractAvroEventSerializer`` class. Configuration options are as follows: +Alias: This serializer does not have an alias, and must be specified using the fully-qualified class name class name. + +This serializes Flume events into an Avro container file like the "Flume Event" Avro Event Serializer, however the +record schema is configurable. The record schema may be specified either as a Flume configuration property or passed in an event header. + +To pass the record schema as part of the Flume configuration, use the property ``schemaURL`` as listed below. + +To pass the record schema in an event header, specify either the event header ``flume.avro.schema.literal`` +containing a JSON-format representation of the schema or ``flume.avro.schema.url`` with a URL where +the schema may be found (``hdfs:/...`` URIs are supported). + +This serializer inherits from the ``AbstractAvroEventSerializer`` class. + +Configuration options are as follows: ========================== ================ =========================================================================== Property Name Default Description ========================== ================ =========================================================================== syncIntervalBytes 2048000 Avro sync interval, in approximate bytes. compressionCodec null Avro compression codec. For supported codecs, see Avro's CodecFactory docs. +schemaURL null Avro schema URL. Schemas specified in the header ovverride this option. ========================== ================ =========================================================================== Example for agent named a1: @@ -3257,8 +3297,9 @@ Example for agent named a1: a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S - a1.sinks.k1.serializer = avro_event + a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder a1.sinks.k1.serializer.compressionCodec = snappy + a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc Flume Interceptors http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java index fea6218..3231742 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java @@ -18,14 +18,6 @@ */ package org.apache.flume.sink.hdfs; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URL; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; @@ -44,7 +36,21 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC; +import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC; +import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_STATIC_SCHEMA_URL; +import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES; +import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL; +import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES; /** * <p> @@ -76,6 +82,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable { private int syncIntervalBytes; private String compressionCodec; private Map<String, Schema> schemaCache = new HashMap<String, Schema>(); + private String staticSchemaURL; private AvroEventSerializer(OutputStream out) { this.out = out; @@ -87,6 +94,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable { context.getInteger(SYNC_INTERVAL_BYTES, DEFAULT_SYNC_INTERVAL_BYTES); compressionCodec = context.getString(COMPRESSION_CODEC, DEFAULT_COMPRESSION_CODEC); + staticSchemaURL = context.getString(STATIC_SCHEMA_URL, DEFAULT_STATIC_SCHEMA_URL); } @Override @@ -111,19 +119,24 @@ public class AvroEventSerializer implements EventSerializer, Configurable { private void initialize(Event event) throws IOException { Schema schema = null; String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER); - if (schemaUrl != null) { + String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER); + + if (schemaUrl != null) { // if URL_HEADER is there then use it schema = schemaCache.get(schemaUrl); if (schema == null) { schema = loadFromUrl(schemaUrl); schemaCache.put(schemaUrl, schema); } - } - if (schema == null) { - String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER); - if (schemaString == null) { - throw new FlumeException("Could not find schema for event " + event); - } + } else if (schemaString != null) { // fallback to LITERAL_HEADER if it was there schema = new Schema.Parser().parse(schemaString); + } else if (staticSchemaURL != null) { // fallback to static url if it was there + schema = schemaCache.get(staticSchemaURL); + if (schema == null) { + schema = loadFromUrl(staticSchemaURL); + schemaCache.put(staticSchemaURL, schema); + } + } else { // no other options so giving up + throw new FlumeException("Could not find schema for event " + event); } writer = new GenericDatumWriter<Object>(schema); http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java index 38af74d..6b38da2 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java @@ -39,10 +39,12 @@ import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; +import org.apache.flume.serialization.AvroEventSerializerConfigurationConstants; import org.apache.flume.serialization.EventSerializer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.After; public class TestAvroEventSerializer { @@ -53,39 +55,55 @@ public class TestAvroEventSerializer { file = File.createTempFile(getClass().getSimpleName(), ""); } + @After + public void tearDown() throws Exception { + file.delete(); + } + @Test public void testNoCompression() throws IOException { - createAvroFile(file, null, false); + createAvroFile(file, null, false, false); validateAvroFile(file); } @Test public void testNullCompression() throws IOException { - createAvroFile(file, "null", false); + createAvroFile(file, "null", false, false); validateAvroFile(file); } @Test public void testDeflateCompression() throws IOException { - createAvroFile(file, "deflate", false); + createAvroFile(file, "deflate", false, false); validateAvroFile(file); } @Test public void testSnappyCompression() throws IOException { - createAvroFile(file, "snappy", false); + createAvroFile(file, "snappy", false, false); validateAvroFile(file); } @Test public void testSchemaUrl() throws IOException { - createAvroFile(file, null, true); + createAvroFile(file, null, true, false); validateAvroFile(file); } - public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws - IOException { + @Test + public void testStaticSchemaUrl() throws IOException { + createAvroFile(file,null,false, true); + validateAvroFile(file); + } + @Test + public void testBothUrls() throws IOException { + createAvroFile(file,null,true,true); + validateAvroFile(file); + } + + public void createAvroFile(File file, String codec, boolean useSchemaUrl, + boolean useStaticSchemaUrl) throws IOException { // serialize a few events using the reflection-based avro serializer OutputStream out = new FileOutputStream(file); @@ -100,11 +118,16 @@ public class TestAvroEventSerializer { })); GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); File schemaFile = null; - if (useSchemaUrl) { + if (useSchemaUrl || useStaticSchemaUrl) { schemaFile = File.createTempFile(getClass().getSimpleName(), ".avsc"); Files.write(schema.toString(), schemaFile, Charsets.UTF_8); } + if (useStaticSchemaUrl) { + ctx.put(AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL, + schemaFile.toURI().toURL().toExternalForm()); + } + EventSerializer.Builder builder = new AvroEventSerializer.Builder(); EventSerializer serializer = builder.build(ctx, out); @@ -112,10 +135,10 @@ public class TestAvroEventSerializer { for (int i = 0; i < 3; i++) { GenericRecord record = recordBuilder.set("message", "Hello " + i).build(); Event event = EventBuilder.withBody(serializeAvro(record, schema)); - if (schemaFile == null) { + if (schemaFile == null && !useSchemaUrl) { event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_LITERAL_HEADER, schema.toString()); - } else { + } else if (useSchemaUrl) { event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_URL_HEADER, schemaFile.toURI().toURL().toExternalForm()); } @@ -125,6 +148,10 @@ public class TestAvroEventSerializer { serializer.beforeClose(); out.flush(); out.close(); + if (schemaFile != null ) { + schemaFile.delete(); + } + } private byte[] serializeAvro(Object datum, Schema schema) throws IOException {
