[ 
https://issues.apache.org/jira/browse/FLUME-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14740499#comment-14740499
 ] 

Hari Sekhon commented on FLUME-2790:
------------------------------------

I'm aware of the benefits of using a distribution (I used to work for Cloudera) 
but I used Flume 1.6 from Apache because it appeared from the Flume user guide 
that the Kafka source/sinks only became available in Flume 1.6. Trying the 
Flume 1.5.2 supplied as part of HDP 2.3 just now results in the original 
exception implying that this specific dot version of Flume 1.5 in HDP 2.3 does 
indeed have the Kafka source albeit still out of date:
{code}15/09/11 08:20:02 INFO kafka.KafkaSource: Kafka source s1 started.
15/09/11 08:20:02 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
[flume_<custom_scrubbed>-1441959602433-f74b0e46-leader-finder-thread], Failed 
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
        at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
        at kafka.cluster.Broker.connectionString(Broker.scala:62)
        at 
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at 
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
        at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
15/09/11 08:20:02 INFO consumer.ConsumerFetcherManager: 
[ConsumerFetcherManager-1441959602525] Added fetcher for partitions 
ArrayBuffer()
15/09/11 08:20:03 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
[flume_<custom_scrubbed>-1441959602433-f74b0e46-leader-finder-thread], Failed 
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
        at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
        at kafka.cluster.Broker.connectionString(Broker.scala:62)
        at 
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at 
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
        at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
{code}
Flume version in HDP 2.3:
{code}flume-ng version
Flume 1.5.2.2.3.0.0-2557
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 1bb1c91224dd57b205dae2fc40e9abdd33d00d77
Compiled by jenkins on Tue Jul 14 11:25:46 EDT 2015
>From source with checksum ca29f22707f424d955d24bade16b31c2
{code}
So it seems using newer Flume 1.6.0 with the newer Kafka libs integrated is 
closer to working. The final problem is that the Flume Kafka source doesn't 
have any setting in which to enable plaintextsasl for talking to a secured 
Kafka cluster. Since this jira is mainly addressing incompatibility between 
Kafka 0.8.2 and Flume 1.6.0 versions I've raised another jira to address the 
specific configuration requirement for Flume Kafka Kerberos integration 
(FLUME-2792).

> Flume Kafka NullPointerException while trying to connect to Kafka topic as 
> source (consumer.ConsumerFetcherManager LeaderFinderThread Failed to find 
> leader for Set)
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2790
>                 URL: https://issues.apache.org/jira/browse/FLUME-2790
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.6.0, 1.6
>         Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2 + 
> Apache Flume 1.6 downloaded from apache.org
>            Reporter: Hari Sekhon
>            Priority: Blocker
>
> I'm getting the following NullPointerException when trying to integrate Flume 
> with Kafka as a source:
> {code}
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues 
> for this fetcher
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in 
> all the consumer message iterators
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets 
> after clearing the fetcher queues
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition 
> ownership
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer 
> flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following 
> partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers: 
> List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim 
> partition 0
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim 
> partition 1
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 
> 1 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], 
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition 
> 0 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer 
> flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions : 
> <custom_scrubbed>:0: fetched offset = -1: consumed offset = 
> -1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], 
> Starting
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer 
> flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
> 15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored 
> counter group for type: SOURCE, name: s1: Successfully registered new MBean.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type: 
> SOURCE, name: s1 started
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is 
> overridden to flume
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property 
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property 
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
>         at java.util.Hashtable.put(Hashtable.java:514)
>         at 
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
>         at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions 
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is 
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
>         at java.util.Hashtable.put(Hashtable.java:514)
>         at 
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
>         at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager: 
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions 
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is 
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property 
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread: 
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed 
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> ...
> <repeats>
> {code}
> There is a single kafka broker which definitely has a leader and ISR as seen 
> here:
> {code}
> Topic:<custom_scrubbed>  PartitionCount:2        ReplicationFactor:1     
> Configs:
>         Topic: <custom_scrubbed> Partition: 0    Leader: 0       Replicas: 0  
>    Isr: 0
>         Topic: <custom_scrubbed> Partition: 1    Leader: 0       Replicas: 0  
>    Isr: 0
> {code}
> The relevant bit of Flume config is:
> {code}
> a1.sources = s1
> a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.s1.channels = c1
> a1.sources.s1.zookeeperConnect = myfqdn:2181
> a1.sources.s1.topic = mytopic
> a1.sources.s1.groupId = flume
> a1.sources.s1.kafka.consumer.timeout.ms = 100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to