Repository: flume
Updated Branches:
  refs/heads/trunk fdc53f338 -> 83e25691d


FLUME-3100. Support arbitrary header substitution for topic of Kafka Sink

This patch adds the ability of header substitution n Kafka Sink's
kafka.topic configuration variable.

This closes #137.

Reviewers: Denes Arvay

(Takafumi Saito via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/83e25691
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/83e25691
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/83e25691

Branch: refs/heads/trunk
Commit: 83e25691dc5f32d020b122d679b6f124162e4aef
Parents: fdc53f3
Author: stakafum <[email protected]>
Authored: Thu May 25 15:23:44 2017 +0900
Committer: Denes Arvay <[email protected]>
Committed: Tue Jun 13 12:01:27 2017 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  2 ++
 .../org/apache/flume/sink/kafka/KafkaSink.java  |  3 +-
 .../apache/flume/sink/kafka/TestConstants.java  |  1 +
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 37 ++++++++++++++++++++
 4 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a5d64f0..2cd5465 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2727,6 +2727,8 @@ kafka.topic                         default-flume-topic  
The topic in Kafka to w
                                                          messages will be 
published to this topic.
                                                          If the event header 
contains a "topic" field, the event will be published to that topic
                                                          overriding the topic 
configured here.
+                                                         Arbitrary header 
substitution is supported, eg. %{header} is replaced with value of event header 
named "header".
+                                                         (If using the 
substitution, it is recommended to set "auto.create.topics.enable" property of 
Kafka broker to true.)
 flumeBatchSize                      100                  How many messages to 
process in one batch. Larger batches improve throughput while adding latency.
 kafka.producer.acks                 1                    How many replicas 
must acknowledge a message before its considered successfully written.
                                                          Accepted values are 0 
(Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all 
replicas)

http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 68866c3..f18908b 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -32,6 +32,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
+import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -173,7 +174,7 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
 
         eventTopic = headers.get(TOPIC_HEADER);
         if (eventTopic == null) {
-          eventTopic = topic;
+          eventTopic = BucketPath.escapeString(topic, event.getHeaders());
         }
         eventKey = headers.get(KEY_HEADER);
         if (logger.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
index 6d85700..8d6dce7 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -20,6 +20,7 @@ package org.apache.flume.sink.kafka;
 
 public class TestConstants {
   public static final String STATIC_TOPIC = "static-topic";
+  public static final String HEADER_TOPIC = "%{header1}-topic";
   public static final String CUSTOM_KEY = "custom-key";
   public static final String CUSTOM_TOPIC = "custom-topic";
   public static final String HEADER_1_VALUE = "test-avro-header";

http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 7c66420..975661d 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -91,6 +91,7 @@ public class TestKafkaSink {
     topics.add(DEFAULT_TOPIC);
     topics.add(TestConstants.STATIC_TOPIC);
     topics.add(TestConstants.CUSTOM_TOPIC);
+    topics.add(TestConstants.HEADER_1_VALUE + "-topic");
     testUtil.initTopicList(topics);
   }
 
@@ -239,6 +240,42 @@ public class TestKafkaSink {
                  new String((byte[]) fetchedMsg.key(), "UTF-8"));
   }
 
+  @Test
+  public void testReplaceSubStringOfTopicWithHeaders() throws 
UnsupportedEncodingException {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-replace-substring-of-topic-with-headers";
+    Map<String, String> headers = new HashMap<>();
+    headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    String fetchedMsg = new String((byte[])
+        testUtil.getNextMessageFromConsumer(TestConstants.HEADER_1_VALUE + 
"-topic").message());
+
+    assertEquals(msg, fetchedMsg);
+  }
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testAvroEvent() throws IOException {

Reply via email to