[
https://issues.apache.org/jira/browse/FLUME-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739148#comment-14739148
]
Gonzalo Herreros commented on FLUME-2790:
-----------------------------------------
Why don't you use the version of Flume included in HDP?. The tools in the
distribution will work with each other.
Notice that kafka 0.8.2.2 hasn't been officially been released, so you cannot
expect Apache Flume to upgrade before that happens.
You can always extend the KafkaSource yourself to work with the new version if
you don't want to wait.
> Flume Kafka NullPointerException while trying to connect to Kafka topic as
> source (consumer.ConsumerFetcherManager LeaderFinderThread Failed to find
> leader for Set)
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2790
> URL: https://issues.apache.org/jira/browse/FLUME-2790
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.6.0, 1.6
> Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2 +
> Apache Flume 1.6 downloaded from apache.org
> Reporter: Hari Sekhon
> Priority: Blocker
>
> I'm getting the following NullPointerException when trying to integrate Flume
> with Kafka as a source:
> {code}
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues
> for this fetcher
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in
> all the consumer message iterators
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets
> after clearing the fetcher queues
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition
> ownership
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following
> partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers:
> List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 0
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 1
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 1 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 0 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions :
> <custom_scrubbed>:0: fetched offset = -1: consumed offset =
> -1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread],
> Starting
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
> 15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: s1: Successfully registered new MBean.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: s1 started
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> ...
> <repeats>
> {code}
> There is a single kafka broker which definitely has a leader and ISR as seen
> here:
> {code}
> Topic:<custom_scrubbed> PartitionCount:2 ReplicationFactor:1
> Configs:
> Topic: <custom_scrubbed> Partition: 0 Leader: 0 Replicas: 0
> Isr: 0
> Topic: <custom_scrubbed> Partition: 1 Leader: 0 Replicas: 0
> Isr: 0
> {code}
> The relevant bit of Flume config is:
> {code}
> a1.sources = s1
> a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.s1.channels = c1
> a1.sources.s1.zookeeperConnect = myfqdn:2181
> a1.sources.s1.topic = mytopic
> a1.sources.s1.groupId = flume
> a1.sources.s1.kafka.consumer.timeout.ms = 100
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)