Repository: flume
Updated Branches:
  refs/heads/flume-1.7 49680d6bc -> 8d06a72d9


FLUME-2852: Kafka Source/Sink should optionally read/write Flume records

(Tristan Stevens via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8d06a72d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8d06a72d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8d06a72d

Branch: refs/heads/flume-1.7
Commit: 8d06a72d9dc660c28e1217891f9c5085b8192085
Parents: 49680d6
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Thu Apr 21 13:37:26 2016 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Thu Apr 21 13:38:11 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  9 +++
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 66 +++++++++++++++-
 .../flume/sink/kafka/KafkaSinkConstants.java    |  4 +-
 .../apache/flume/sink/kafka/TestConstants.java  |  2 +
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 68 ++++++++++++++++
 .../apache/flume/source/kafka/KafkaSource.java  | 77 ++++++++++++++++--
 .../source/kafka/KafkaSourceConstants.java      |  3 +
 .../source/kafka/KafkaSourceEmbeddedKafka.java  | 19 +++--
 .../flume/source/kafka/TestKafkaSource.java     | 83 ++++++++++++++++++++
 9 files changed, 316 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 5149ab5..9c11fe6 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1215,6 +1215,11 @@ backoffSleepIncrement            1000         Initial 
and incremental wait time
 maxBackoffSleep                  5000         Maximum wait time that is 
triggered when a Kafka Topic appears to be empty.  Five seconds is
                                               ideal for ingestion use cases 
but a lower value may be required for low latency operations
                                               with interceptors.
+useFlumeEventFormat              false        By default events are taken as 
bytes from the Kafka topic directly into the event body. Set to
+                                              true to read events as the Flume 
Avro binary format. Used in conjunction with the same property
+                                              on the KafkaSink or with the 
parseAsFlumeEvent property on the Kafka Channel this will preserve
+                                              any Flume headers sent on the 
producing side.
+
 Other Kafka Consumer Properties  --           These properties are used to 
configure the Kafka Consumer. Any producer property supported
                                               by Kafka can be used. The only 
requirement is to prepend the property name with the prefix ``kafka.consumer``.
                                               For example: 
kafka.consumer.auto.offset.reset
@@ -2515,6 +2520,10 @@ flumeBatchSize                   100                  
How many messages to proce
 kafka.producer.acks              1                    How many replicas must 
acknowledge a message before its considered successfully written.
                                                       Accepted values are 0 
(Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all 
replicas)
                                                       Set this to -1 to avoid 
data loss in some cases of leader failure.
+useFlumeEventFormat              false                By default events are 
put as bytes onto the Kafka topic directly from the event body. Set to
+                                                      true to store events as 
the Flume Avro binary format. Used in conjunction with the same property
+                                                      on the KafkaSource or 
with the parseAsFlumeEvent property on the Kafka Channel this will preserve
+                                                      any Flume headers for 
the producing side.
 Other Kafka Producer Properties  --                   These properties are 
used to configure the Kafka Producer. Any producer property supported
                                                       by Kafka can be used. 
The only requirement is to prepend the property name with the prefix
                                                       ``kafka.producer``.

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 2e2140e..7bef7f3 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -18,7 +18,13 @@
 
 package org.apache.flume.sink.kafka;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -28,6 +34,7 @@ import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
 import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -36,6 +43,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +96,7 @@ import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_
  * improve throughput while adding latency.
  * requiredAcks -- 0 (unsafe), 1 (accepted by at least one broker, default),
  * -1 (accepted by all brokers)
+ * useFlumeEventFormat - preserves event headers when serializing onto Kafka
  * <p/>
  * header properties (per event):
  * topic
@@ -101,6 +113,17 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
   private int batchSize;
   private List<Future<RecordMetadata>> kafkaFutures;
   private KafkaSinkCounter counter;
+  private boolean useAvroEventFormat;
+  private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
+          Optional.absent();
+  private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
+          Optional.absent();
+  private Optional<ByteArrayOutputStream> tempOutStream = Optional
+          .absent();
+
+  //Fine to use null for initial value, Avro will create new ones if this
+  // is null
+  private BinaryEncoder encoder = null;
 
 
   //For testing
@@ -160,8 +183,13 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
 
         // create a message and add to buffer
         long startTime = System.currentTimeMillis();
-        kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> 
(eventTopic, eventKey, eventBody),
+
+        try {
+          kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> 
(eventTopic, eventKey, serializeEvent(event, useAvroEventFormat)),
                                          new SinkCallback(startTime)));
+        } catch (IOException ex) {
+          throw new EventDeliveryException("Could not serialize event", ex);
+        }
       }
 
       //Prevent linger.ms from holding the batch
@@ -255,6 +283,12 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
       logger.debug("Using batch size: {}", batchSize);
     }
 
+    useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, 
KafkaSinkConstants.DEFAULT_AVRO_EVENT);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", 
useAvroEventFormat);
+    }
+
     kafkaFutures = new LinkedList<Future<RecordMetadata>>();
 
     String bootStrapServers = context.getString(BOOTSTRAP_SERVERS_CONFIG);
@@ -342,6 +376,36 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
   protected Properties getKafkaProps() {
     return kafkaProps;
   }
+
+  private byte[] serializeEvent(Event event, boolean useAvroEventFormat) 
throws IOException {
+    byte[] bytes;
+    if (useAvroEventFormat) {
+      if (!tempOutStream.isPresent()) {
+        tempOutStream = Optional.of(new ByteArrayOutputStream());
+      }
+      if (!writer.isPresent()) {
+        writer = Optional.of(new 
SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+      }
+      tempOutStream.get().reset();
+      AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), 
ByteBuffer.wrap(event.getBody()));
+      encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(), 
encoder);
+      writer.get().write(e, encoder);
+      encoder.flush();
+      bytes = tempOutStream.get().toByteArray();
+    } else {
+      bytes = event.getBody();
+    }
+    return bytes;
+  }
+
+  private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, 
String> stringMap) {
+    Map<CharSequence, CharSequence> charSeqMap = new HashMap<CharSequence, 
CharSequence>();
+    for (Map.Entry<String, String> entry : stringMap.entrySet()) {
+      charSeqMap.put(entry.getKey(), entry.getValue());
+    }
+    return charSeqMap;
+  }
+
 }
 
 class SinkCallback implements Callback {

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index c84dec0..6b64bc1 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -34,6 +34,9 @@ public class KafkaSinkConstants {
   public static final String KEY_HEADER = "key";
   public static final String TOPIC_HEADER = "topic";
 
+  public static final String AVRO_EVENT = "useFlumeEventFormat";
+  public static final boolean DEFAULT_AVRO_EVENT = false;
+
   public static final String DEFAULT_KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringSerializer";
   public static final String DEFAULT_VALUE_SERIAIZER = 
"org.apache.kafka.common.serialization.ByteArraySerializer";
 
@@ -42,7 +45,6 @@ public class KafkaSinkConstants {
   public static final String DEFAULT_ACKS = "1";
 
 
-
   /* Old Properties */
 
    /* Properties */

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
index f99be53..6d85700 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -22,4 +22,6 @@ public class TestConstants {
   public static final String STATIC_TOPIC = "static-topic";
   public static final String CUSTOM_KEY = "custom-key";
   public static final String CUSTOM_TOPIC = "custom-topic";
+  public static final String HEADER_1_VALUE = "test-avro-header";
+  public static final String HEADER_1_KEY = "header1";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 1852099..f577e98 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -19,6 +19,11 @@
 package org.apache.flume.sink.kafka;
 
 import kafka.message.MessageAndMetadata;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -29,12 +34,17 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.kafka.util.TestUtil;
+import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.base.Charsets;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -208,6 +218,64 @@ public class TestKafkaSink {
 
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testAvroEvent() throws IOException {
+
+
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(AVRO_EVENT, "true");
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-avro-event";
+
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("topic", TestConstants.CUSTOM_TOPIC);
+    headers.put("key", TestConstants.CUSTOM_KEY);
+    headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    MessageAndMetadata fetchedMsg =
+      testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+
+    ByteArrayInputStream in =
+        new ByteArrayInputStream((byte[])fetchedMsg.message());
+    BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+    SpecificDatumReader<AvroFlumeEvent> reader = new 
SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class);
+
+    AvroFlumeEvent avroevent = reader.read(null, decoder);
+
+    String eventBody = new String(avroevent.getBody().array(), Charsets.UTF_8);
+    Map<CharSequence, CharSequence> eventHeaders = avroevent.getHeaders();
+
+    assertEquals(msg, eventBody);
+    assertEquals(TestConstants.CUSTOM_KEY,
+      new String((byte[]) fetchedMsg.key(), "UTF-8"));
+
+    assertEquals(TestConstants.HEADER_1_VALUE, eventHeaders.get(new 
Utf8(TestConstants.HEADER_1_KEY)).toString());
+    assertEquals(TestConstants.CUSTOM_KEY, eventHeaders.get(new 
Utf8("key")).toString());
+
+  }
+
   @Test
   public void testEmptyChannel() throws UnsupportedEncodingException,
           EventDeliveryException {

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index db806cc..84fef52 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -16,6 +16,7 @@
  */
 package org.apache.flume.source.kafka;
 
+import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,6 +29,9 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -37,7 +41,7 @@ import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
 import org.apache.flume.source.AbstractPollableSource;
-
+import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -48,6 +52,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+
 /**
  * A Source for Kafka which reads messages from kafka topics.
  *
@@ -69,6 +75,8 @@ import org.slf4j.LoggerFactory;
  * <p>
  * <tt>kafka.consumer.*: </tt> Any property starting with "kafka.consumer" 
will be
  * passed to the kafka consumer So you can use any configuration supported by 
Kafka 0.9.0.X
+ * <tt>useFlumeEventFormat: </tt> Reads events from Kafka Topic as an Avro 
FlumeEvent. Used
+ * in conjunction with useFlumeEventFormat (Kafka Sink) or parseAsFlumeEvent 
(Kafka Channel)
  * <p>
  */
 public class KafkaSource extends AbstractPollableSource
@@ -87,6 +95,11 @@ public class KafkaSource extends AbstractPollableSource
 
   private Map<String, String> headers;
 
+  private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = 
Optional.absent();
+  private BinaryDecoder decoder = null;
+
+  private boolean useAvroEventFormat;
+
   private int batchUpperLimit;
   private int maxBatchDurationMillis;
 
@@ -139,6 +152,7 @@ public class KafkaSource extends AbstractPollableSource
     byte[] kafkaMessage;
     String kafkaKey;
     Event event;
+    byte[] eventBody;
 
     try {
       // prepare time variables for new batch
@@ -178,11 +192,41 @@ public class KafkaSource extends AbstractPollableSource
         kafkaKey = message.key();
         kafkaMessage = message.value();
 
-        headers.clear();
-        // Add headers to event (timestamp, topic, partition, key)
-        headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, 
String.valueOf(System.currentTimeMillis()));
-        headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
-        headers.put(KafkaSourceConstants.PARTITION_HEADER, 
String.valueOf(message.partition()));
+        if (useAvroEventFormat) {
+          //Assume the event is in Avro format using the AvroFlumeEvent schema
+          //Will need to catch the exception if it is not
+          ByteArrayInputStream in =
+                  new ByteArrayInputStream(message.value());
+          decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
+          if (!reader.isPresent()) {
+            reader = Optional.of(
+                    new 
SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
+          }
+          //This may throw an exception but it will be caught by the
+          //exception handler below and logged at error
+          AvroFlumeEvent avroevent = reader.get().read(null, decoder);
+
+          eventBody = avroevent.getBody().array();
+          headers = toStringMap(avroevent.getHeaders());
+        } else {
+          eventBody = message.value();
+          headers.clear();
+          headers = new HashMap<String, String>(4);
+        }
+
+        // Add headers to event (timestamp, topic, partition, key) only if 
they don't exist
+        if (!headers.containsKey(KafkaSourceConstants.TIMESTAMP_HEADER)) {
+          headers.put(KafkaSourceConstants.TIMESTAMP_HEADER,
+              String.valueOf(System.currentTimeMillis()));
+        }
+        if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) {
+          headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
+        }
+        if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) {
+          headers.put(KafkaSourceConstants.PARTITION_HEADER,
+              String.valueOf(message.partition()));
+        }
+
         if (kafkaKey != null) {
           headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);
         }
@@ -191,10 +235,10 @@ public class KafkaSource extends AbstractPollableSource
           log.debug("Topic: {} Partition: {} Message: {}", new String[]{
                   message.topic(),
                   String.valueOf(message.partition()),
-                  new String(kafkaMessage)});
+                  new String(eventBody)});
         }
 
-        event = EventBuilder.withBody(kafkaMessage, headers);
+        event = EventBuilder.withBody(eventBody, headers);
         eventList.add(event);
 
         if (log.isDebugEnabled()) {
@@ -275,6 +319,12 @@ public class KafkaSource extends AbstractPollableSource
     maxBatchDurationMillis = 
context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
             KafkaSourceConstants.DEFAULT_BATCH_DURATION);
 
+    useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, 
KafkaSourceConstants.DEFAULT_AVRO_EVENT);
+
+    if (log.isDebugEnabled()) {
+      log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", 
useAvroEventFormat);
+    }
+
     String bootstrapServers = 
context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS);
     if (bootstrapServers == null || bootstrapServers.isEmpty()) {
       throw new ConfigurationException("Bootstrap Servers must be specified");
@@ -334,6 +384,17 @@ public class KafkaSource extends AbstractPollableSource
     return kafkaProps;
   }
 
+  /**
+   * Helper function to convert a map of CharSequence to a map of String.
+   */
+  private static Map<String, String> toStringMap(Map<CharSequence, 
CharSequence> charSeqMap) {
+    Map<String, String> stringMap = new HashMap<String, String>();
+    for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
+      stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+    }
+    return stringMap;
+  }
+
   <T> Subscriber<T> getSubscriber() {
     return subscriber;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 2999cf2..9f20f61 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -34,6 +34,9 @@ public class KafkaSourceConstants {
   public static final int DEFAULT_BATCH_DURATION = 1000;
   public static final String DEFAULT_GROUP_ID = "flume";
 
+  public static final String AVRO_EVENT = "useFlumeEventFormat";
+  public static final boolean DEFAULT_AVRO_EVENT = false;
+
   /* Old Properties */
 
   public static final String TOPIC = "topic";

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 46d545f..affac03 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -24,6 +24,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.io.File;
@@ -52,7 +53,7 @@ public class KafkaSourceEmbeddedKafka {
   int zkPort = 21818; // none-standard
   int serverPort = 18922;
 
-  KafkaProducer<String, String> producer;
+  KafkaProducer<String, byte[]> producer;
   File dir;
 
   public KafkaSourceEmbeddedKafka(Properties properties) {
@@ -95,12 +96,16 @@ public class KafkaSourceEmbeddedKafka {
     Properties props = new Properties();
     props.put("bootstrap.servers", HOST + ":" + serverPort);
     props.put("acks", "1");
-    producer = new KafkaProducer<String,String>(props,
-            new StringSerializer(), new StringSerializer());
+    producer = new KafkaProducer<String,byte[]>(props,
+            new StringSerializer(), new ByteArraySerializer());
   }
 
   public void produce(String topic, String k, String v) {
-    ProducerRecord<String, String> rec = new ProducerRecord<String, 
String>(topic, k, v);
+    produce(topic, k, v.getBytes());
+  }
+
+  public void produce(String topic, String k, byte[] v) {
+    ProducerRecord<String, byte[]> rec = new ProducerRecord<String, 
byte[]>(topic, k, v);
     try {
       producer.send(rec).get();
     } catch (InterruptedException e) {
@@ -111,7 +116,11 @@ public class KafkaSourceEmbeddedKafka {
   }
 
   public void produce(String topic, int partition, String k, String v) {
-    ProducerRecord<String, String> rec = new ProducerRecord<String, 
String>(topic, partition, k, v);
+    produce(topic,partition,k,v.getBytes());
+  }
+
+  public void produce(String topic, int partition, String k, byte[] v) {
+    ProducerRecord<String, byte[]> rec = new ProducerRecord<String, 
byte[]>(topic, partition, k, v);
     try {
       producer.send(rec).get();
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/flume/blob/8d06a72d/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 8e04da8..b4250de 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -22,7 +22,12 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
@@ -31,9 +36,13 @@ import com.google.common.collect.Lists;
 import junit.framework.Assert;
 import kafka.common.TopicExistsException;
 
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flume.*;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.After;
 import org.junit.Before;
@@ -505,6 +514,80 @@ public class TestKafkaSource {
     Assert.assertFalse(subscriber.get().matcher("topic").find());
   }
 
+  @Test
+  public void testAvroEvent() throws InterruptedException, 
EventDeliveryException, IOException {
+    SpecificDatumWriter<AvroFlumeEvent> writer;
+    ByteArrayOutputStream tempOutStream;
+    BinaryEncoder encoder;
+    byte[] bytes;
+
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
+    context.put(AVRO_EVENT, "true");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    tempOutStream = new ByteArrayOutputStream();
+    writer = new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class);
+
+    Map<CharSequence, CharSequence> headers = new 
HashMap<CharSequence,CharSequence>();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    AvroFlumeEvent e = new AvroFlumeEvent(headers, ByteBuffer.wrap("hello, 
world".getBytes()));
+    encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream, null);
+    writer.write(e, encoder);
+    encoder.flush();
+    bytes = tempOutStream.toByteArray();
+
+    kafkaServer.produce(topic0, "", bytes);
+
+    String currentTimestamp = Long.toString(System.currentTimeMillis());
+
+    headers.put(TIMESTAMP_HEADER, currentTimestamp);
+    headers.put(PARTITION_HEADER, "1");
+    headers.put(TOPIC_HEADER, "topic0");
+
+    e = new AvroFlumeEvent(headers, ByteBuffer.wrap("hello, 
world2".getBytes()));
+    tempOutStream.reset();
+    encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream, null);
+    writer.write(e, encoder);
+    encoder.flush();
+    bytes = tempOutStream.toByteArray();
+
+    kafkaServer.produce(topic0, "", bytes);
+
+    Thread.sleep(500L);
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+
+    Assert.assertEquals(2, events.size());
+
+    Event event = events.get(0);
+
+    Assert.assertEquals("hello, world", new String(event.getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertEquals("value1", e.getHeaders().get("header1"));
+    Assert.assertEquals("value2", e.getHeaders().get("header2"));
+
+
+    event = events.get(1);
+
+    Assert.assertEquals("hello, world2", new String(event.getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertEquals("value1", e.getHeaders().get("header1"));
+    Assert.assertEquals("value2", e.getHeaders().get("header2"));
+    Assert.assertEquals(currentTimestamp, 
e.getHeaders().get(TIMESTAMP_HEADER));
+    Assert.assertEquals(e.getHeaders().get(PARTITION_HEADER), "1");
+    Assert.assertEquals(e.getHeaders().get(TOPIC_HEADER),"topic0");
+
+  }
+
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);

Reply via email to