[
https://issues.apache.org/jira/browse/FLUME-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14740499#comment-14740499
]
Hari Sekhon edited comment on FLUME-2790 at 9/11/15 9:52 AM:
-------------------------------------------------------------
I'm aware of the benefits of using a distribution (I used to work for Cloudera)
but I used Flume 1.6 from Apache because it appeared from the Flume user guide
that the Kafka source/sinks only became available in Flume 1.6. Trying the
Flume 1.5.2 supplied as part of HDP 2.3 just now results in the original
exception implying that this specific dot version of Flume 1.5 in HDP 2.3 does
indeed have the Kafka source albeit still out of date:
{code}15/09/11 08:20:02 INFO kafka.KafkaSource: Kafka source s1 started.
15/09/11 08:20:02 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
[flume_<custom_scrubbed>-1441959602433-f74b0e46-leader-finder-thread], Failed
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
15/09/11 08:20:02 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1441959602525] Added fetcher for partitions
ArrayBuffer()
15/09/11 08:20:03 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
[flume_<custom_scrubbed>-1441959602433-f74b0e46-leader-finder-thread], Failed
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
{code}
Flume version in HDP 2.3:
{code}flume-ng version
Flume 1.5.2.2.3.0.0-2557
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 1bb1c91224dd57b205dae2fc40e9abdd33d00d77
Compiled by jenkins on Tue Jul 14 11:25:46 EDT 2015
>From source with checksum ca29f22707f424d955d24bade16b31c2
{code}
So it seems using Flume 1.6.0 plus integrating the latest Kafka libs manually
is actually closer to working with this version of Kafka that is supplied in
HDP 2.3. The final problem is that the Flume Kafka source doesn't have any
setting in which to enable plaintextsasl for talking to a secured Kafka
cluster. Since this jira is mainly addressing incompatibility between Kafka
0.8.2 and Flume 1.6.0 versions I've raised another jira to address the specific
configuration requirement for Flume Kafka Kerberos integration (FLUME-2792).
was (Author: harisekhon):
I'm aware of the benefits of using a distribution (I used to work for Cloudera)
but I used Flume 1.6 from Apache because it appeared from the Flume user guide
that the Kafka source/sinks only became available in Flume 1.6. Trying the
Flume 1.5.2 supplied as part of HDP 2.3 just now results in the original
exception implying that this specific dot version of Flume 1.5 in HDP 2.3 does
indeed have the Kafka source albeit still out of date:
{code}15/09/11 08:20:02 INFO kafka.KafkaSource: Kafka source s1 started.
15/09/11 08:20:02 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
[flume_<custom_scrubbed>-1441959602433-f74b0e46-leader-finder-thread], Failed
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
15/09/11 08:20:02 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1441959602525] Added fetcher for partitions
ArrayBuffer()
15/09/11 08:20:03 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
[flume_<custom_scrubbed>-1441959602433-f74b0e46-leader-finder-thread], Failed
to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
{code}
Flume version in HDP 2.3:
{code}flume-ng version
Flume 1.5.2.2.3.0.0-2557
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 1bb1c91224dd57b205dae2fc40e9abdd33d00d77
Compiled by jenkins on Tue Jul 14 11:25:46 EDT 2015
>From source with checksum ca29f22707f424d955d24bade16b31c2
{code}
So it seems using newer Flume 1.6.0 with the newer Kafka libs integrated is
closer to working. The final problem is that the Flume Kafka source doesn't
have any setting in which to enable plaintextsasl for talking to a secured
Kafka cluster. Since this jira is mainly addressing incompatibility between
Kafka 0.8.2 and Flume 1.6.0 versions I've raised another jira to address the
specific configuration requirement for Flume Kafka Kerberos integration
(FLUME-2792).
> Flume Kafka NullPointerException while trying to connect to Kafka topic as
> source (consumer.ConsumerFetcherManager LeaderFinderThread Failed to find
> leader for Set)
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2790
> URL: https://issues.apache.org/jira/browse/FLUME-2790
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.6.0, 1.6
> Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2 +
> Apache Flume 1.6 downloaded from apache.org
> Reporter: Hari Sekhon
> Priority: Blocker
>
> I'm getting the following NullPointerException when trying to integrate Flume
> with Kafka as a source:
> {code}
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues
> for this fetcher
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in
> all the consumer message iterators
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets
> after clearing the fetcher queues
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition
> ownership
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following
> partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers:
> List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 0
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 1
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 1 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 0 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions :
> <custom_scrubbed>:0: fetched offset = -1: consumed offset =
> -1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread],
> Starting
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
> 15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: s1: Successfully registered new MBean.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: s1 started
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> ...
> <repeats>
> {code}
> There is a single kafka broker which definitely has a leader and ISR as seen
> here:
> {code}
> Topic:<custom_scrubbed> PartitionCount:2 ReplicationFactor:1
> Configs:
> Topic: <custom_scrubbed> Partition: 0 Leader: 0 Replicas: 0
> Isr: 0
> Topic: <custom_scrubbed> Partition: 1 Leader: 0 Replicas: 0
> Isr: 0
> {code}
> The relevant bit of Flume config is:
> {code}
> a1.sources = s1
> a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.s1.channels = c1
> a1.sources.s1.zookeeperConnect = myfqdn:2181
> a1.sources.s1.topic = mytopic
> a1.sources.s1.groupId = flume
> a1.sources.s1.kafka.consumer.timeout.ms = 100
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)