[
https://issues.apache.org/jira/browse/FLUME-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739112#comment-14739112
]
Hari Sekhon commented on FLUME-2790:
------------------------------------
Ok just solved that by upgrading the zookeeper client in flume as well, and am
on to the next error, which is basically that there is no setting for the Kafka
sasl. I've just checked the Flume docs for the Kafka source and there is no
setting to tell it that it's talking to a secured Kafka broker:
{code}15/09/10 16:51:22 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1441903874830] Added fetcher for partitions
ArrayBuffer()
15/09/10 16:51:22 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
[flume_<custom_scrubbed>-1441903874763-abdc98ec-leader-finder-thread], Failed
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
kafka.common.BrokerEndPointNotAvailableException: End point PLAINTEXT not found
for broker 0
at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:140)
at
kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:124)
at
kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:124)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
kafka.utils.ZkUtils$.getAllBrokerEndPointsForChannel(ZkUtils.scala:124)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
{code}
> Flume Kafka NullPointerException while trying to connect to Kafka topic as
> source (consumer.ConsumerFetcherManager LeaderFinderThread Failed to find
> leader for Set)
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2790
> URL: https://issues.apache.org/jira/browse/FLUME-2790
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.6.0, 1.6
> Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2 +
> Apache Flume 1.6 downloaded from apache.org
> Reporter: Hari Sekhon
> Priority: Blocker
>
> I'm getting the following NullPointerException when trying to integrate Flume
> with Kafka as a source:
> {code}
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues
> for this fetcher
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in
> all the consumer message iterators
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets
> after clearing the fetcher queues
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition
> ownership
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following
> partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers:
> List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 0
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 1
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 1 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 0 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions :
> <custom_scrubbed>:0: fetched offset = -1: consumed offset =
> -1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread],
> Starting
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
> 15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: s1: Successfully registered new MBean.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: s1 started
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> ...
> <repeats>
> {code}
> There is a single kafka broker which definitely has a leader and ISR as seen
> here:
> {code}
> Topic:<custom_scrubbed> PartitionCount:2 ReplicationFactor:1
> Configs:
> Topic: <custom_scrubbed> Partition: 0 Leader: 0 Replicas: 0
> Isr: 0
> Topic: <custom_scrubbed> Partition: 1 Leader: 0 Replicas: 0
> Isr: 0
> {code}
> The relevant bit of Flume config is:
> {code}
> a1.sources = s1
> a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.s1.channels = c1
> a1.sources.s1.zookeeperConnect = myfqdn:2181
> a1.sources.s1.topic = mytopic
> a1.sources.s1.groupId = flume
> a1.sources.s1.kafka.consumer.timeout.ms = 100
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)