Repository: flume
Updated Branches:
  refs/heads/trunk 4a3f3c76f -> 54e2728a8


FLUME-3046. Kafka Sink and Source Configuration Improvements

This patch fixes the infinite loop between Kafka source and Kafka sink
by introducing the following configuration parameters in those components:
- topicHeader in Kafka source to specify the name of the header where it
  stores the topic name where the event comes from.
- setTopicHeader in Kafka source to control whether the topic name is stored
  in the given header.
- topicHeader in Kafka sink to configure the name of the header which
  is used to specify in which topic to send the event.
- allowTopicOverride in Kafka sink to control whether the target topic's name
  can be overridden by the specified header.

This closes #105

Reviewers: Attila Simon

(Tristan Stevens via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/54e2728a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/54e2728a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/54e2728a

Branch: refs/heads/trunk
Commit: 54e2728a8e141ee63704018c4497bbe083c0f75f
Parents: 4a3f3c7
Author: Tristan Stevens <[email protected]>
Authored: Tue Sep 12 21:42:57 2017 +0200
Committer: Denes Arvay <[email protected]>
Committed: Tue Sep 12 21:58:05 2017 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  9 ++
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 25 ++++-
 .../flume/sink/kafka/KafkaSinkConstants.java    |  5 +-
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 88 +++++++++++++++++-
 .../apache/flume/source/kafka/KafkaSource.java  | 13 ++-
 .../source/kafka/KafkaSourceConstants.java      |  6 +-
 .../flume/source/kafka/TestKafkaSource.java     | 96 +++++++++++++++++++-
 7 files changed, 230 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index fd64749..8e9efcf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1284,6 +1284,12 @@ useFlumeEventFormat                 false        By 
default events are taken as
                                                  true to read events as the 
Flume Avro binary format. Used in conjunction with the same property
                                                  on the KafkaSink or with the 
parseAsFlumeEvent property on the Kafka Channel this will preserve
                                                  any Flume headers sent on the 
producing side.
+setTopicHeader                      true         When set to true, stores the 
topic of the retrieved message into a header, defined by the
+                                                 ``topicHeader`` property.
+topicHeader                         topic        Defines the name of the 
header in which to store the name of the topic the message was received
+                                                 from, if the 
``setTopicHeader`` property is set to ``true``. Care should be taken if 
combining
+                                                 with the Kafka Sink 
``topicHeader`` property so as to avoid sending the message back to the same
+                                                 topic in a loop.
 migrateZookeeperOffsets             true         When no Kafka stored offset 
is found, look up the offsets in Zookeeper and commit them to Kafka.
                                                  This should be true to 
support seamless Kafka client migration from older versions of Flume.
                                                  Once migrated this can be set 
to false, though that should generally not be required.
@@ -2785,6 +2791,9 @@ partitionIdHeader                   --                   
When set, the sink will
                                                          from the event header 
and send the message to the specified partition of the topic. If the
                                                          value represents an 
invalid partition, an EventDeliveryException will be thrown. If the header value
                                                          is present then this 
setting overrides ``defaultPartitionId``.
+allowTopicOverride                  true                 When set, the sink 
will allow a message to be produced into a topic specified by the 
``topicHeader`` property (if provided).
+topicHeader                         topic                When set in 
conjunction with ``allowTopicOverride`` will produce a message into the value 
of the header named using the value of this property.
+                                                         Care should be taken 
when using in conjunction with the Kafka Source ``topicHeader`` property to 
avoid creating a loopback.
 kafka.producer.security.protocol    PLAINTEXT            Set to 
SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of 
security. See below for additional info on secure setup.
 *more producer security props*                           If using 
SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
                                                          properties that need 
to be set on producer.

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index f18908b..d60d67e 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -67,7 +67,6 @@ import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
 import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_HEADER;
 import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
 import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
 
@@ -117,6 +116,9 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
   private boolean useAvroEventFormat;
   private String partitionHeader = null;
   private Integer staticPartitionId = null;
+  private boolean allowTopicOverride;
+  private String topicHeader = null;
+
   private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
           Optional.absent();
   private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
@@ -172,10 +174,19 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
         byte[] eventBody = event.getBody();
         Map<String, String> headers = event.getHeaders();
 
-        eventTopic = headers.get(TOPIC_HEADER);
-        if (eventTopic == null) {
-          eventTopic = BucketPath.escapeString(topic, event.getHeaders());
+        if (allowTopicOverride) {
+          eventTopic = headers.get(topicHeader);
+          if (eventTopic == null) {
+            eventTopic = BucketPath.escapeString(topic, event.getHeaders());
+            logger.debug("{} was set to true but header {} was null. Producing 
to {}" + 
+                " topic instead.",
+                new Object[]{KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, 
+                    topicHeader, eventTopic});
+          }
+        } else {
+          eventTopic = topic;
         }
+
         eventKey = headers.get(KEY_HEADER);
         if (logger.isTraceEnabled()) {
           if (LogPrivacyUtil.allowLogRawData()) {
@@ -317,6 +328,12 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
     partitionHeader = 
context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME);
     staticPartitionId = 
context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF);
 
+    allowTopicOverride = 
context.getBoolean(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
+                                          
KafkaSinkConstants.DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER);
+
+    topicHeader = context.getString(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER,
+                                    
KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER);
+
     if (logger.isDebugEnabled()) {
       logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", 
useAvroEventFormat);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 7c819f5..ffca3df 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -33,7 +33,10 @@ public class KafkaSinkConstants {
       KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
   public static final String KEY_HEADER = "key";
-  public static final String TOPIC_HEADER = "topic";
+  public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic";
+  public static final String TOPIC_OVERRIDE_HEADER = "topicHeader";
+  public static final String ALLOW_TOPIC_OVERRIDE_HEADER = 
"allowTopicOverride";
+  public static final boolean DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER = true;
 
   public static final String AVRO_EVENT = "useFlumeEventFormat";
   public static final boolean DEFAULT_AVRO_EVENT = false;

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 975661d..d92c71f 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -240,6 +240,92 @@ public class TestKafkaSink {
                  new String((byte[]) fetchedMsg.key(), "UTF-8"));
   }
 
+  /**
+   * Tests that a message will be produced to a topic as specified by a
+   * custom topicHeader parameter (FLUME-3046).
+   * @throws UnsupportedEncodingException
+   */
+  @Test
+  public void testTopicFromConfHeader() throws UnsupportedEncodingException {
+    String customTopicHeader = "customTopicHeader";
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, customTopicHeader);
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-topic-from-config-header";
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put(customTopicHeader, TestConstants.CUSTOM_TOPIC);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    MessageAndMetadata<?, ?> fetchedMsg =
+        testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+
+    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+  }
+
+  /**
+   * Tests that the topicHeader parameter will be ignored if the 
allowTopicHeader
+   * parameter is set to false (FLUME-3046).
+   * @throws UnsupportedEncodingException
+   */
+  @Test
+  public void testTopicNotFromConfHeader() throws UnsupportedEncodingException 
{
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, "false");
+    context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, "foo");
+
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-topic-from-config-header";
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put(KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER, 
TestConstants.CUSTOM_TOPIC);
+    headers.put("foo", TestConstants.CUSTOM_TOPIC);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    MessageAndMetadata<?, ?> fetchedMsg =
+        testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC);
+
+    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+  }
+
   @Test
   public void testReplaceSubStringOfTopicWithHeaders() throws 
UnsupportedEncodingException {
     Sink kafkaSink = new KafkaSink();
@@ -612,4 +698,4 @@ public class TestKafkaSink {
     return newTopic;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index d381850..ffdc96e 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -127,6 +127,8 @@ public class KafkaSource extends AbstractPollableSource
   private String bootstrapServers;
   private String groupId = DEFAULT_GROUP_ID;
   private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
+  private String topicHeader = null;
+  private boolean setTopicHeader;
 
   /**
    * This class is a helper to subscribe for topics by using
@@ -250,8 +252,9 @@ public class KafkaSource extends AbstractPollableSource
           headers.put(KafkaSourceConstants.TIMESTAMP_HEADER,
               String.valueOf(System.currentTimeMillis()));
         }
-        if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) {
-          headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
+        // Only set the topic header if setTopicHeader and it isn't already 
populated
+        if (setTopicHeader && !headers.containsKey(topicHeader)) {
+          headers.put(topicHeader, message.topic());
         }
         if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) {
           headers.put(KafkaSourceConstants.PARTITION_HEADER,
@@ -400,6 +403,12 @@ public class KafkaSource extends AbstractPollableSource
       log.info("Group ID was not specified. Using {} as the group id.", 
groupId);
     }
 
+    setTopicHeader = context.getBoolean(KafkaSourceConstants.SET_TOPIC_HEADER,
+                                        
KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER);
+
+    topicHeader = context.getString(KafkaSourceConstants.TOPIC_HEADER,
+                                    KafkaSourceConstants.DEFAULT_TOPIC_HEADER);
+
     setConsumerProps(context);
 
     if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index bf1a19d..474a143 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -49,9 +49,13 @@ public class KafkaSourceConstants {
   public static final String OLD_GROUP_ID = "groupId";
 
   // flume event headers
-  public static final String TOPIC_HEADER = "topic";
+  public static final String DEFAULT_TOPIC_HEADER = "topic";
   public static final String KEY_HEADER = "key";
   public static final String TIMESTAMP_HEADER = "timestamp";
   public static final String PARTITION_HEADER = "partition";
 
+  public static final String SET_TOPIC_HEADER = "setTopicHeader";
+  public static final boolean DEFAULT_SET_TOPIC_HEADER = true;
+  public static final String TOPIC_HEADER = "topicHeader";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index d1daceb..7804fa2 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -76,7 +76,7 @@ import static 
org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADE
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
+import static 
org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
 import static 
org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -556,7 +556,7 @@ public class TestKafkaSource {
 
     headers.put(TIMESTAMP_HEADER, currentTimestamp);
     headers.put(PARTITION_HEADER, "1");
-    headers.put(TOPIC_HEADER, "topic0");
+    headers.put(DEFAULT_TOPIC_HEADER, "topic0");
 
     e = new AvroFlumeEvent(headers, ByteBuffer.wrap("hello, 
world2".getBytes()));
     tempOutStream.reset();
@@ -590,7 +590,7 @@ public class TestKafkaSource {
     Assert.assertEquals("value2", e.getHeaders().get("header2"));
     Assert.assertEquals(currentTimestamp, 
e.getHeaders().get(TIMESTAMP_HEADER));
     Assert.assertEquals(e.getHeaders().get(PARTITION_HEADER), "1");
-    Assert.assertEquals(e.getHeaders().get(TOPIC_HEADER),"topic0");
+    Assert.assertEquals(e.getHeaders().get(DEFAULT_TOPIC_HEADER),"topic0");
 
   }
 
@@ -655,6 +655,96 @@ public class TestKafkaSource {
     
Assert.assertNull(kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
   }
 
+  /**
+   * Tests the availability of the topic header in the output events,
+   * based on the configuration parameters added in FLUME-3046
+   * @throws InterruptedException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testTopicHeaderSet() throws InterruptedException, 
EventDeliveryException {
+    context.put(TOPICS, topic0);
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertEquals(topic0, events.get(0).getHeaders().get("topic"));
+
+    kafkaSource.stop();
+    events.clear();
+  }
+
+  /**
+   * Tests the availability of the custom topic header in the output events,
+   * based on the configuration parameters added in FLUME-3046
+   * @throws InterruptedException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testTopicCustomHeaderSet() throws InterruptedException, 
EventDeliveryException {
+    context.put(TOPICS, topic0);
+    context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader");
+    kafkaSource.configure(context);
+
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world2");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world2", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertEquals(topic0, 
events.get(0).getHeaders().get("customTopicHeader"));
+
+    kafkaSource.stop();
+    events.clear();
+  }
+
+  /**
+   * Tests the unavailability of the topic header in the output events,
+   * based on the configuration parameters added in FLUME-3046
+   * @throws InterruptedException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testTopicCustomHeaderNotSet() throws InterruptedException, 
EventDeliveryException {
+    context.put(TOPICS, topic0);
+    context.put(KafkaSourceConstants.SET_TOPIC_HEADER, "false");
+    kafkaSource.configure(context);
+
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world3");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world3", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertNull(events.get(0).getHeaders().get("customTopicHeader"));
+
+    kafkaSource.stop();
+  }
+
   public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, 
boolean hasKafkaOffsets,
                                             String group) throws Exception {
     // create a topic with 1 partition for simplicity

Reply via email to