Repository: flume Updated Branches: refs/heads/trunk fdc53f338 -> 83e25691d
FLUME-3100. Support arbitrary header substitution for topic of Kafka Sink This patch adds the ability of header substitution n Kafka Sink's kafka.topic configuration variable. This closes #137. Reviewers: Denes Arvay (Takafumi Saito via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/83e25691 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/83e25691 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/83e25691 Branch: refs/heads/trunk Commit: 83e25691dc5f32d020b122d679b6f124162e4aef Parents: fdc53f3 Author: stakafum <[email protected]> Authored: Thu May 25 15:23:44 2017 +0900 Committer: Denes Arvay <[email protected]> Committed: Tue Jun 13 12:01:27 2017 +0200 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 ++ .../org/apache/flume/sink/kafka/KafkaSink.java | 3 +- .../apache/flume/sink/kafka/TestConstants.java | 1 + .../apache/flume/sink/kafka/TestKafkaSink.java | 37 ++++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index a5d64f0..2cd5465 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2727,6 +2727,8 @@ kafka.topic default-flume-topic The topic in Kafka to w messages will be published to this topic. If the event header contains a "topic" field, the event will be published to that topic overriding the topic configured here. + Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named "header". + (If using the substitution, it is recommended to set "auto.create.topics.enable" property of Kafka broker to true.) flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency. kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index 68866c3..f18908b 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -32,6 +32,7 @@ import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.conf.LogPrivacyUtil; +import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.kafka.KafkaSinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.flume.source.avro.AvroFlumeEvent; @@ -173,7 +174,7 @@ public class KafkaSink extends AbstractSink implements Configurable { eventTopic = headers.get(TOPIC_HEADER); if (eventTopic == null) { - eventTopic = topic; + eventTopic = BucketPath.escapeString(topic, event.getHeaders()); } eventKey = headers.get(KEY_HEADER); if (logger.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java index 6d85700..8d6dce7 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java @@ -20,6 +20,7 @@ package org.apache.flume.sink.kafka; public class TestConstants { public static final String STATIC_TOPIC = "static-topic"; + public static final String HEADER_TOPIC = "%{header1}-topic"; public static final String CUSTOM_KEY = "custom-key"; public static final String CUSTOM_TOPIC = "custom-topic"; public static final String HEADER_1_VALUE = "test-avro-header"; http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 7c66420..975661d 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -91,6 +91,7 @@ public class TestKafkaSink { topics.add(DEFAULT_TOPIC); topics.add(TestConstants.STATIC_TOPIC); topics.add(TestConstants.CUSTOM_TOPIC); + topics.add(TestConstants.HEADER_1_VALUE + "-topic"); testUtil.initTopicList(topics); } @@ -239,6 +240,42 @@ public class TestKafkaSink { new String((byte[]) fetchedMsg.key(), "UTF-8")); } + @Test + public void testReplaceSubStringOfTopicWithHeaders() throws UnsupportedEncodingException { + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "test-replace-substring-of-topic-with-headers"; + Map<String, String> headers = new HashMap<>(); + headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE); + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes(), headers); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + String fetchedMsg = new String((byte[]) + testUtil.getNextMessageFromConsumer(TestConstants.HEADER_1_VALUE + "-topic").message()); + + assertEquals(msg, fetchedMsg); + } + @SuppressWarnings("rawtypes") @Test public void testAvroEvent() throws IOException {
