Manohar created FLUME-2814:
------------------------------
Summary: flume kafka sink does not write events to configured sink
topic when source is also from other topic of kafka
Key: FLUME-2814
URL: https://issues.apache.org/jira/browse/FLUME-2814
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: v1.5.0
Reporter: Manohar
I was testing a case when flume agent is reading from kafka source from topic
'sourcetopic' and sink configured to kafkasink but to other topic
'destinationtopic',
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = sourcetopic
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = destinationtopic
tier1.sinks.sink1.brokerList = localhost:9092
tier1.sinks.sink1.channel = channel1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
With this settings i noticed that event were not written to 'destinationtopic',
After debugging the agent if found that kafka source puts in topic name in
header.
headers.put(KafkaSourceConstants.TOPIC, topic);
and in sink check is made to see if headers contain topic, if exists then we
take topic name from header and write event that topic and there by discarding
configured sink topic i ,e destinationtopic.
here is code snippet that does, even though variable topic as destinationtopic,
since header had topic, kafka sink takes topic name from header and puts event
to that topic i,e again to source topic
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)