[ 
https://issues.apache.org/jira/browse/FLUME-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hari Sekhon updated FLUME-2790:
-------------------------------
    Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2, using either 
the HDP 2.3 version of Flume 1.5.2 or Apache Flume 1.6 downloaded from 
apache.org  (was: HDP 2.3 fully kerberized including Kafka 0.8.2.2 + Apache 
Flume 1.6 downloaded from apache.org)

> Flume Kafka NullPointerException while trying to connect to Kafka topic as 
> source (consumer.ConsumerFetcherManager LeaderFinderThread Failed to find 
> leader for Set)
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2790
>                 URL: https://issues.apache.org/jira/browse/FLUME-2790
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.6.0
>         Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2, using 
> either the HDP 2.3 version of Flume 1.5.2 or Apache Flume 1.6 downloaded from 
> apache.org
>            Reporter: Hari Sekhon
>            Priority: Blocker
>
> I'm getting the following NullPointerException when trying to integrate Flume 
> with Kafka as a source:
> {code}
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues 
> for this fetcher
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in 
> all the consumer message iterators
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets 
> after clearing the fetcher queues
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition 
> ownership
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer 
> flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following 
> partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers: 
> List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim 
> partition 0
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim 
> partition 1
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 
> 1 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 
> 0 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer 
> flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions : 
> <custom_scrubbed>:0: fetched offset = -1: consumed offset = 
> -1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], 
> Starting
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer 
> flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
> 15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored 
> counter group for type: SOURCE, name: s1: Successfully registered new MBean.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type: 
> SOURCE, name: s1 started
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is 
> overridden to flume
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property 
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property 
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
>         at java.util.Hashtable.put(Hashtable.java:514)
>         at 
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
>         at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions 
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is 
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
>         at java.util.Hashtable.put(Hashtable.java:514)
>         at 
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
>         at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions 
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is 
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> ...
> <repeats>
> {code}
> There is a single kafka broker which definitely has a leader and ISR as seen 
> here:
> {code}
> Topic:<custom_scrubbed>  PartitionCount:2        ReplicationFactor:1     
> Configs:
>         Topic: <custom_scrubbed> Partition: 0    Leader: 0       Replicas: 0  
>    Isr: 0
>         Topic: <custom_scrubbed> Partition: 1    Leader: 0       Replicas: 0  
>    Isr: 0
> {code}
> The relevant bit of Flume config is:
> {code}
> a1.sources = s1
> a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.s1.channels = c1
> a1.sources.s1.zookeeperConnect = myfqdn:2181
> a1.sources.s1.topic = mytopic
> a1.sources.s1.groupId = flume
> a1.sources.s1.kafka.consumer.timeout.ms = 100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to