Repository: flume Updated Branches: refs/heads/flume-1.7 49680d6bc -> 8d06a72d9
FLUME-2852: Kafka Source/Sink should optionally read/write Flume records (Tristan Stevens via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8d06a72d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8d06a72d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8d06a72d Branch: refs/heads/flume-1.7 Commit: 8d06a72d9dc660c28e1217891f9c5085b8192085 Parents: 49680d6 Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Apr 21 13:37:26 2016 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Apr 21 13:38:11 2016 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 9 +++ .../org/apache/flume/sink/kafka/KafkaSink.java | 66 +++++++++++++++- .../flume/sink/kafka/KafkaSinkConstants.java | 4 +- .../apache/flume/sink/kafka/TestConstants.java | 2 + .../apache/flume/sink/kafka/TestKafkaSink.java | 68 ++++++++++++++++ .../apache/flume/source/kafka/KafkaSource.java | 77 ++++++++++++++++-- .../source/kafka/KafkaSourceConstants.java | 3 + .../source/kafka/KafkaSourceEmbeddedKafka.java | 19 +++-- .../flume/source/kafka/TestKafkaSource.java | 83 ++++++++++++++++++++ 9 files changed, 316 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 5149ab5..9c11fe6 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1215,6 +1215,11 @@ backoffSleepIncrement 1000 Initial and incremental wait time maxBackoffSleep 5000 Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. +useFlumeEventFormat false By default events are taken as bytes from the Kafka topic directly into the event body. Set to + true to read events as the Flume Avro binary format. Used in conjunction with the same property + on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve + any Flume headers sent on the producing side. + Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.consumer``. For example: kafka.consumer.auto.offset.reset @@ -2515,6 +2520,10 @@ flumeBatchSize 100 How many messages to proce kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure. +useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to + true to store events as the Flume Avro binary format. Used in conjunction with the same property + on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve + any Flume headers for the producing side. Other Kafka Producer Properties -- These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.producer``. http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index 2e2140e..7bef7f3 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -18,7 +18,13 @@ package org.apache.flume.sink.kafka; +import com.google.common.base.Optional; import com.google.common.base.Throwables; + +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -28,6 +34,7 @@ import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.instrumentation.kafka.KafkaSinkCounter; import org.apache.flume.sink.AbstractSink; +import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -36,6 +43,10 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -85,6 +96,7 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_ * improve throughput while adding latency. * requiredAcks -- 0 (unsafe), 1 (accepted by at least one broker, default), * -1 (accepted by all brokers) + * useFlumeEventFormat - preserves event headers when serializing onto Kafka * <p/> * header properties (per event): * topic @@ -101,6 +113,17 @@ public class KafkaSink extends AbstractSink implements Configurable { private int batchSize; private List<Future<RecordMetadata>> kafkaFutures; private KafkaSinkCounter counter; + private boolean useAvroEventFormat; + private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = + Optional.absent(); + private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = + Optional.absent(); + private Optional<ByteArrayOutputStream> tempOutStream = Optional + .absent(); + + //Fine to use null for initial value, Avro will create new ones if this + // is null + private BinaryEncoder encoder = null; //For testing @@ -160,8 +183,13 @@ public class KafkaSink extends AbstractSink implements Configurable { // create a message and add to buffer long startTime = System.currentTimeMillis(); - kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> (eventTopic, eventKey, eventBody), + + try { + kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> (eventTopic, eventKey, serializeEvent(event, useAvroEventFormat)), new SinkCallback(startTime))); + } catch (IOException ex) { + throw new EventDeliveryException("Could not serialize event", ex); + } } //Prevent linger.ms from holding the batch @@ -255,6 +283,12 @@ public class KafkaSink extends AbstractSink implements Configurable { logger.debug("Using batch size: {}", batchSize); } + useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, KafkaSinkConstants.DEFAULT_AVRO_EVENT); + + if (logger.isDebugEnabled()) { + logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat); + } + kafkaFutures = new LinkedList<Future<RecordMetadata>>(); String bootStrapServers = context.getString(BOOTSTRAP_SERVERS_CONFIG); @@ -342,6 +376,36 @@ public class KafkaSink extends AbstractSink implements Configurable { protected Properties getKafkaProps() { return kafkaProps; } + + private byte[] serializeEvent(Event event, boolean useAvroEventFormat) throws IOException { + byte[] bytes; + if (useAvroEventFormat) { + if (!tempOutStream.isPresent()) { + tempOutStream = Optional.of(new ByteArrayOutputStream()); + } + if (!writer.isPresent()) { + writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); + } + tempOutStream.get().reset(); + AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody())); + encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(), encoder); + writer.get().write(e, encoder); + encoder.flush(); + bytes = tempOutStream.get().toByteArray(); + } else { + bytes = event.getBody(); + } + return bytes; + } + + private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) { + Map<CharSequence, CharSequence> charSeqMap = new HashMap<CharSequence, CharSequence>(); + for (Map.Entry<String, String> entry : stringMap.entrySet()) { + charSeqMap.put(entry.getKey(), entry.getValue()); + } + return charSeqMap; + } + } class SinkCallback implements Callback { http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java index c84dec0..6b64bc1 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java @@ -34,6 +34,9 @@ public class KafkaSinkConstants { public static final String KEY_HEADER = "key"; public static final String TOPIC_HEADER = "topic"; + public static final String AVRO_EVENT = "useFlumeEventFormat"; + public static final boolean DEFAULT_AVRO_EVENT = false; + public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer"; @@ -42,7 +45,6 @@ public class KafkaSinkConstants { public static final String DEFAULT_ACKS = "1"; - /* Old Properties */ /* Properties */ http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java index f99be53..6d85700 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java @@ -22,4 +22,6 @@ public class TestConstants { public static final String STATIC_TOPIC = "static-topic"; public static final String CUSTOM_KEY = "custom-key"; public static final String CUSTOM_TOPIC = "custom-topic"; + public static final String HEADER_1_VALUE = "test-avro-header"; + public static final String HEADER_1_KEY = "header1"; } http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 1852099..f577e98 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -19,6 +19,11 @@ package org.apache.flume.sink.kafka; import kafka.message.MessageAndMetadata; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.util.Utf8; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -29,12 +34,17 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.flume.sink.kafka.util.TestUtil; +import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.base.Charsets; + +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; @@ -208,6 +218,64 @@ public class TestKafkaSink { } + @SuppressWarnings("rawtypes") + @Test + public void testAvroEvent() throws IOException { + + + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + context.put(AVRO_EVENT, "true"); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "test-avro-event"; + + Map<String, String> headers = new HashMap<String, String>(); + headers.put("topic", TestConstants.CUSTOM_TOPIC); + headers.put("key", TestConstants.CUSTOM_KEY); + headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE); + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes(), headers); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + MessageAndMetadata fetchedMsg = + testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); + + ByteArrayInputStream in = + new ByteArrayInputStream((byte[])fetchedMsg.message()); + BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); + SpecificDatumReader<AvroFlumeEvent> reader = new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class); + + AvroFlumeEvent avroevent = reader.read(null, decoder); + + String eventBody = new String(avroevent.getBody().array(), Charsets.UTF_8); + Map<CharSequence, CharSequence> eventHeaders = avroevent.getHeaders(); + + assertEquals(msg, eventBody); + assertEquals(TestConstants.CUSTOM_KEY, + new String((byte[]) fetchedMsg.key(), "UTF-8")); + + assertEquals(TestConstants.HEADER_1_VALUE, eventHeaders.get(new Utf8(TestConstants.HEADER_1_KEY)).toString()); + assertEquals(TestConstants.CUSTOM_KEY, eventHeaders.get(new Utf8("key")).toString()); + + } + @Test public void testEmptyChannel() throws UnsupportedEncodingException, EventDeliveryException { http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index db806cc..84fef52 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -16,6 +16,7 @@ */ package org.apache.flume.source.kafka; +import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -28,6 +29,9 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; @@ -37,7 +41,7 @@ import org.apache.flume.conf.ConfigurationException; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.kafka.KafkaSourceCounter; import org.apache.flume.source.AbstractPollableSource; - +import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -48,6 +52,8 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; + /** * A Source for Kafka which reads messages from kafka topics. * @@ -69,6 +75,8 @@ import org.slf4j.LoggerFactory; * <p> * <tt>kafka.consumer.*: </tt> Any property starting with "kafka.consumer" will be * passed to the kafka consumer So you can use any configuration supported by Kafka 0.9.0.X + * <tt>useFlumeEventFormat: </tt> Reads events from Kafka Topic as an Avro FlumeEvent. Used + * in conjunction with useFlumeEventFormat (Kafka Sink) or parseAsFlumeEvent (Kafka Channel) * <p> */ public class KafkaSource extends AbstractPollableSource @@ -87,6 +95,11 @@ public class KafkaSource extends AbstractPollableSource private Map<String, String> headers; + private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent(); + private BinaryDecoder decoder = null; + + private boolean useAvroEventFormat; + private int batchUpperLimit; private int maxBatchDurationMillis; @@ -139,6 +152,7 @@ public class KafkaSource extends AbstractPollableSource byte[] kafkaMessage; String kafkaKey; Event event; + byte[] eventBody; try { // prepare time variables for new batch @@ -178,11 +192,41 @@ public class KafkaSource extends AbstractPollableSource kafkaKey = message.key(); kafkaMessage = message.value(); - headers.clear(); - // Add headers to event (timestamp, topic, partition, key) - headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis())); - headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic()); - headers.put(KafkaSourceConstants.PARTITION_HEADER, String.valueOf(message.partition())); + if (useAvroEventFormat) { + //Assume the event is in Avro format using the AvroFlumeEvent schema + //Will need to catch the exception if it is not + ByteArrayInputStream in = + new ByteArrayInputStream(message.value()); + decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); + if (!reader.isPresent()) { + reader = Optional.of( + new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class)); + } + //This may throw an exception but it will be caught by the + //exception handler below and logged at error + AvroFlumeEvent avroevent = reader.get().read(null, decoder); + + eventBody = avroevent.getBody().array(); + headers = toStringMap(avroevent.getHeaders()); + } else { + eventBody = message.value(); + headers.clear(); + headers = new HashMap<String, String>(4); + } + + // Add headers to event (timestamp, topic, partition, key) only if they don't exist + if (!headers.containsKey(KafkaSourceConstants.TIMESTAMP_HEADER)) { + headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, + String.valueOf(System.currentTimeMillis())); + } + if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) { + headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic()); + } + if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) { + headers.put(KafkaSourceConstants.PARTITION_HEADER, + String.valueOf(message.partition())); + } + if (kafkaKey != null) { headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey); } @@ -191,10 +235,10 @@ public class KafkaSource extends AbstractPollableSource log.debug("Topic: {} Partition: {} Message: {}", new String[]{ message.topic(), String.valueOf(message.partition()), - new String(kafkaMessage)}); + new String(eventBody)}); } - event = EventBuilder.withBody(kafkaMessage, headers); + event = EventBuilder.withBody(eventBody, headers); eventList.add(event); if (log.isDebugEnabled()) { @@ -275,6 +319,12 @@ public class KafkaSource extends AbstractPollableSource maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS, KafkaSourceConstants.DEFAULT_BATCH_DURATION); + useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, KafkaSourceConstants.DEFAULT_AVRO_EVENT); + + if (log.isDebugEnabled()) { + log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat); + } + String bootstrapServers = context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS); if (bootstrapServers == null || bootstrapServers.isEmpty()) { throw new ConfigurationException("Bootstrap Servers must be specified"); @@ -334,6 +384,17 @@ public class KafkaSource extends AbstractPollableSource return kafkaProps; } + /** + * Helper function to convert a map of CharSequence to a map of String. + */ + private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) { + Map<String, String> stringMap = new HashMap<String, String>(); + for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) { + stringMap.put(entry.getKey().toString(), entry.getValue().toString()); + } + return stringMap; + } + <T> Subscriber<T> getSubscriber() { return subscriber; } http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 2999cf2..9f20f61 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -34,6 +34,9 @@ public class KafkaSourceConstants { public static final int DEFAULT_BATCH_DURATION = 1000; public static final String DEFAULT_GROUP_ID = "flume"; + public static final String AVRO_EVENT = "useFlumeEventFormat"; + public static final boolean DEFAULT_AVRO_EVENT = false; + /* Old Properties */ public static final String TOPIC = "topic"; http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 46d545f..affac03 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -24,6 +24,7 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; @@ -52,7 +53,7 @@ public class KafkaSourceEmbeddedKafka { int zkPort = 21818; // none-standard int serverPort = 18922; - KafkaProducer<String, String> producer; + KafkaProducer<String, byte[]> producer; File dir; public KafkaSourceEmbeddedKafka(Properties properties) { @@ -95,12 +96,16 @@ public class KafkaSourceEmbeddedKafka { Properties props = new Properties(); props.put("bootstrap.servers", HOST + ":" + serverPort); props.put("acks", "1"); - producer = new KafkaProducer<String,String>(props, - new StringSerializer(), new StringSerializer()); + producer = new KafkaProducer<String,byte[]>(props, + new StringSerializer(), new ByteArraySerializer()); } public void produce(String topic, String k, String v) { - ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, k, v); + produce(topic, k, v.getBytes()); + } + + public void produce(String topic, String k, byte[] v) { + ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, k, v); try { producer.send(rec).get(); } catch (InterruptedException e) { @@ -111,7 +116,11 @@ public class KafkaSourceEmbeddedKafka { } public void produce(String topic, int partition, String k, String v) { - ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, partition, k, v); + produce(topic,partition,k,v.getBytes()); + } + + public void produce(String topic, int partition, String k, byte[] v) { + ProducerRecord<String, byte[]> rec = new ProducerRecord<String, byte[]>(topic, partition, k, v); try { producer.send(rec).get(); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 8e04da8..b4250de 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -22,7 +22,12 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; @@ -31,9 +36,13 @@ import com.google.common.collect.Lists; import junit.framework.Assert; import kafka.common.TopicExistsException; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; import org.apache.flume.*; import org.apache.flume.PollableSource.Status; import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.junit.After; import org.junit.Before; @@ -505,6 +514,80 @@ public class TestKafkaSource { Assert.assertFalse(subscriber.get().matcher("topic").find()); } + @Test + public void testAvroEvent() throws InterruptedException, EventDeliveryException, IOException { + SpecificDatumWriter<AvroFlumeEvent> writer; + ByteArrayOutputStream tempOutStream; + BinaryEncoder encoder; + byte[] bytes; + + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); + context.put(AVRO_EVENT, "true"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + tempOutStream = new ByteArrayOutputStream(); + writer = new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class); + + Map<CharSequence, CharSequence> headers = new HashMap<CharSequence,CharSequence>(); + headers.put("header1", "value1"); + headers.put("header2", "value2"); + + AvroFlumeEvent e = new AvroFlumeEvent(headers, ByteBuffer.wrap("hello, world".getBytes())); + encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream, null); + writer.write(e, encoder); + encoder.flush(); + bytes = tempOutStream.toByteArray(); + + kafkaServer.produce(topic0, "", bytes); + + String currentTimestamp = Long.toString(System.currentTimeMillis()); + + headers.put(TIMESTAMP_HEADER, currentTimestamp); + headers.put(PARTITION_HEADER, "1"); + headers.put(TOPIC_HEADER, "topic0"); + + e = new AvroFlumeEvent(headers, ByteBuffer.wrap("hello, world2".getBytes())); + tempOutStream.reset(); + encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream, null); + writer.write(e, encoder); + encoder.flush(); + bytes = tempOutStream.toByteArray(); + + kafkaServer.produce(topic0, "", bytes); + + Thread.sleep(500L); + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + + Assert.assertEquals(2, events.size()); + + Event event = events.get(0); + + Assert.assertEquals("hello, world", new String(event.getBody(), + Charsets.UTF_8)); + + Assert.assertEquals("value1", e.getHeaders().get("header1")); + Assert.assertEquals("value2", e.getHeaders().get("header2")); + + + event = events.get(1); + + Assert.assertEquals("hello, world2", new String(event.getBody(), + Charsets.UTF_8)); + + Assert.assertEquals("value1", e.getHeaders().get("header1")); + Assert.assertEquals("value2", e.getHeaders().get("header2")); + Assert.assertEquals(currentTimestamp, e.getHeaders().get(TIMESTAMP_HEADER)); + Assert.assertEquals(e.getHeaders().get(PARTITION_HEADER), "1"); + Assert.assertEquals(e.getHeaders().get(TOPIC_HEADER),"topic0"); + + } + ChannelProcessor createGoodChannel() { ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
