Manohar created FLUME-2814:
------------------------------

             Summary: flume kafka sink does not write events to configured sink 
topic when source is also from other topic of kafka
                 Key: FLUME-2814
                 URL: https://issues.apache.org/jira/browse/FLUME-2814
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: v1.5.0
            Reporter: Manohar


I was testing a case when flume agent is reading from kafka source from topic 
'sourcetopic' and sink configured to kafkasink but to other topic 
'destinationtopic', 

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = sourcetopic

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = destinationtopic
tier1.sinks.sink1.brokerList = localhost:9092
tier1.sinks.sink1.channel = channel1

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

With this settings i noticed that event were not written to 'destinationtopic', 

After debugging the agent if found that kafka source puts in topic name in 
header. 

headers.put(KafkaSourceConstants.TOPIC, topic);

and in sink check is made to see if headers contain topic, if exists then we 
take topic name from header and write event that topic and there by discarding 
configured sink topic i ,e destinationtopic.

here is code snippet that does, even though variable topic as destinationtopic, 
since header had topic, kafka sink takes topic name from header and puts event 
to that topic i,e again to source topic

        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
        }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to