[
https://issues.apache.org/jira/browse/FLUME-2789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Li Ye updated FLUME-2789:
-------------------------
Attachment: FLUME-2789-1.patch
An optinal property called ignoreTopicInHeader is added for Kafka Sink. Its
default value is false, so it is compatible with Flume 1.6.0. If you want to
ignore topic in header and write events to the topic you specified in
properties file, you can set ignoreTopicInHeader to true.
Besides, three optinal properties topicHeader, keyHeader, timestampHeader are
added for Kafka Sink. They are similar to fileHeader and basenameHeader for
Spooling Directory Source.
Their default value are true, so they are compatible with Flume 1.6.0. If you
do not want to add headers storing topic, key or timestamp, you can set them to
false.
> Kafka sink sends messages to the source topic by mistake
> ---------------------------------------------------------
>
> Key: FLUME-2789
> URL: https://issues.apache.org/jira/browse/FLUME-2789
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.6.0
> Environment: Flume 1.6
> Reporter: Alex Tian
> Attachments: FLUME-2789-1.patch
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> In my scenario, I need to send messages from a kafka source to a kafka sink ,
> in other workds, transfering messages from a topic A to another topic B.
> As a result, I find my kafka sink always sends messages back to topic A.
> The reason in the codes is:
> KafkaSink.java:
> ****************************************************************************************
> 107 if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
>
> 108 eventTopic = topic;
>
> 109 }
> ****************************************************************************************
> If the source is a Kafka source, eventTopic won't be null because it is the
> source topic from the msg header. Therefore, the kafka sink will send the
> message to its source topic by mistake.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)