[
https://issues.apache.org/jira/browse/FLUME-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739093#comment-14739093
]
Hari Sekhon commented on FLUME-2790:
------------------------------------
I've just tried replacing the kafka jars in flume lib as you suggested but got
this exception below and can't find this
org.I0Itec.zkclient.ZkClient.createEphemeral class it complains about in any of
the jars under the Kafka installation:
{code}15/09/10 16:32:59 ERROR lifecycle.LifecycleSupervisor: Unable to start
PollableSourceRunner: {
source:org.apache.flume.source.kafka.KafkaSource{name:s1,state:IDLE}
counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError:
org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:919)
at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:313)
at
kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:328)
at
kafka.utils.ZkUtils$.createEphemeralPathExpectConflictHandleZKBug(ZkUtils.scala:366)
at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:275)
at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:260)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:84)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:96)
at org.apache.flume.source.kafka.KafkaSource.start(KafkaSource.java:216)
at
org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:74)
at
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/10 16:32:59 ERROR lifecycle.LifecycleSupervisor: Unsuccessful attempt to
shutdown component: {} due to missing dependencies. Please shutdown the agentor
disable this component, or the agent will bein an undefined state.
java.lang.NullPointerException
at
org.apache.flume.source.PollableSourceRunner$PollingRunner.access$200(PollableSourceRunner.java:125)
at
org.apache.flume.source.PollableSourceRunner.stop(PollableSourceRunner.java:93)
at
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:259)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{code}
> Flume Kafka NullPointerException while trying to connect to Kafka topic as
> source (consumer.ConsumerFetcherManager LeaderFinderThread Failed to find
> leader for Set)
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2790
> URL: https://issues.apache.org/jira/browse/FLUME-2790
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.6.0, 1.6
> Environment: HDP 2.3 fully kerberized including Kafka 0.8.2.2 +
> Apache Flume 1.6 downloaded from apache.org
> Reporter: Hari Sekhon
> Priority: Blocker
>
> I'm getting the following NullPointerException when trying to integrate Flume
> with Kafka as a source:
> {code}
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared all relevant queues
> for this fetcher
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Cleared the data chunks in
> all the consumer message iterators
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Committing all offsets
> after clearing the fetcher queues
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Releasing partition
> ownership
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 rebalancing the following
> partitions: ArrayBuffer(0, 1) for topic <custom_scrubbed> with consumers:
> List(flume_<custom_scrubbed>-1441895965442-55ac2e21-0)
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 0
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 attempting to claim
> partition 1
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 1 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21],
> flume_<custom_scrubbed>-1441895965442-55ac2e21-0 successfully owned partition
> 0 for topic <custom_scrubbed>
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Updating the cache
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], Consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 selected partitions :
> <custom_scrubbed>:0: fetched offset = -1: consumed offset =
> -1,<custom_scrubbed>:1: fetched offset = -1: consumed offset = -1
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread],
> Starting
> 15/09/10 14:39:25 INFO consumer.ZookeeperConsumerConnector:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21], end rebalancing consumer
> flume_<custom_scrubbed>-1441895965442-55ac2e21 try #0
> 15/09/10 14:39:25 INFO kafka.KafkaSource: Kafka source s1 started.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: s1: Successfully registered new MBean.
> 15/09/10 14:39:25 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: s1 started
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:25 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:25 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:25 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> at java.util.Hashtable.put(Hashtable.java:514)
> at
> kafka.producer.ProducerPool$.createSyncProducer(ProducerPool.scala:35)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:50)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 15/09/10 14:39:26 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1441895965538] Added fetcher for partitions
> ArrayBuffer()
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Verifying properties
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property client.id is
> overridden to flume
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to null:-1
> 15/09/10 14:39:26 INFO utils.VerifiableProperties: Property
> request.timeout.ms is overridden to 30000
> 15/09/10 14:39:26 WARN consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_<custom_scrubbed>-1441895965442-55ac2e21-leader-finder-thread], Failed
> to find leader for Set([<custom_scrubbed>,0], [<custom_scrubbed>,1])
> java.lang.NullPointerException
> ...
> <repeats>
> {code}
> There is a single kafka broker which definitely has a leader and ISR as seen
> here:
> {code}
> Topic:<custom_scrubbed> PartitionCount:2 ReplicationFactor:1
> Configs:
> Topic: <custom_scrubbed> Partition: 0 Leader: 0 Replicas: 0
> Isr: 0
> Topic: <custom_scrubbed> Partition: 1 Leader: 0 Replicas: 0
> Isr: 0
> {code}
> The relevant bit of Flume config is:
> {code}
> a1.sources = s1
> a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
> a1.sources.s1.channels = c1
> a1.sources.s1.zookeeperConnect = myfqdn:2181
> a1.sources.s1.topic = mytopic
> a1.sources.s1.groupId = flume
> a1.sources.s1.kafka.consumer.timeout.ms = 100
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)