Hari Sekhon created FLUME-2790:
----------------------------------

             Summary: Flume Kafka NullPointerException while trying to connect 
to Kafka topic as source (consumer.ConsumerFetcherManager LeaderFinderThread 
Failed to find leader for Set)
                 Key: FLUME-2790
                 URL: https://issues.apache.org/jira/browse/FLUME-2790
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: 1.6, v1.6.0
         Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2 + Apache 
Flume 1.6 downloaded from apache.org
            Reporter: Hari Sekhon
            Priority: Critical


I'm getting the following NullPointerException when trying to integrate Flume 
with Kafka as a source:
{code}
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues 
for this fetcher
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in 
all the consumer message iterators
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets after 
clearing the fetcher queues
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition ownership
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer 
flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following 
partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers: 
List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], 
flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim partition 0
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], 
flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim partition 1
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], 
flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 1 
for topic <custom_scrubbed>
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], 
flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 0 
for topic <custom_scrubbed>
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer 
flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions : 
<custom_scrubbed>:0: fetched offset = -1: consumed offset = 
-1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Starting
15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer 
flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored counter 
group for type: SOURCE, name: s1: Successfully registered new MBean.
15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type: 
SOURCE, name: s1 started
15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is 
overridden to flume
15/09/10 14:39:25 INFO utils.VerifiableProperties: Property 
metadata.broker.list is overridden to null:-1
15/09/10 14:39:25 INFO utils.VerifiableProperties: Property request.timeout.ms 
is overridden to 30000
15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
        at java.util.Hashtable.put(Hashtable.java:514)
        at 
kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager: 
[ConsumerFetcherManager-1441895965538] Added fetcher for partitions 
ArrayBuffer()
15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is 
overridden to flume
15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
metadata.broker.list is overridden to null:-1
15/09/10 14:39:26 INFO utils.VerifiableProperties: Property request.timeout.ms 
is overridden to 30000
15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
        at java.util.Hashtable.put(Hashtable.java:514)
        at 
kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager: 
[ConsumerFetcherManager-1441895965538] Added fetcher for partitions 
ArrayBuffer()
15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is 
overridden to flume
15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
metadata.broker.list is overridden to null:-1
15/09/10 14:39:26 INFO utils.VerifiableProperties: Property request.timeout.ms 
is overridden to 30000
15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
[flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
...
<repeats>
{code}
There is a single kafka broker which definitely has a leader and ISR as seen 
here:
{code}
Topic:<custom_scrubbed>  PartitionCount:2        ReplicationFactor:1     
Configs:
        Topic: <custom_scrubbed> Partition: 0    Leader: 0       Replicas: 0    
 Isr: 0
        Topic: <custom_scrubbed> Partition: 1    Leader: 0       Replicas: 0    
 Isr: 0
{code}
The relevant bit of Flume config is:
{code}
a1.sources = s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.channels = c1
a1.sources.s1.zookeeperConnect = myfqdn:2181
a1.sources.s1.topic = mytopic
a1.sources.s1.groupId = flume
a1.sources.s1.kafka.consumer.timeout.ms = 100
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to