[
https://issues.apache.org/jira/browse/NIFI-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Oleg Zhurakousky updated NIFI-1722:
-----------------------------------
Fix Version/s: (was: 0.7.0)
> Intermittent deadlocks in Kafka API when when stopping GetKafka
> ---------------------------------------------------------------
>
> Key: NIFI-1722
> URL: https://issues.apache.org/jira/browse/NIFI-1722
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
>
> It appears that Kafka gets in the state of deadlock when
> _ConsumerConnector.commitOffsets(..)_ is executed during stop call in
> GetKafka. Looking at the thread dump it may be related to the fact that we
> are shutting down consumer when onTrigger is still executing. But It also
> appears that onTrigger is in the deadlock state as well.
> Below are the relevant thread dump segments
> {code}
> "StandardProcessScheduler Thread-7" Id=115 BLOCKED on
> java.lang.Object@2baae51
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
> "ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001"
> Id=25120 WAITING on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126)
> at scala.Option.foreach(Option.scala:236)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> Number of Locked Synchronizers: 1
> - java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f
> "ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001"
> Id=35266 RUNNABLE (in native code)
> at sun.nio.ch.Net.poll(Native Method)
> at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
> - waiting on java.lang.Object@3e91ba2b
> at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
> - waiting on java.lang.Object@494070b0
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> - waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> - waiting on java.lang.Object@7d49c744
> at kafka.utils.CoreUtils$.read(CoreUtils.scala:192)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78)
> - waiting on java.lang.Object@4eba7a24
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> "Timer-Driven Process Thread-10" Id=92 TIMED_WAITING on
> java.util.concurrent.CountDownLatch$Sync@7a46b8e8
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
> at
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67)
> at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48)
> at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> at
> org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152)
> at
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141)
> at
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
> at
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Number of Locked Synchronizers: 1
> - java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748
> "StandardProcessScheduler Thread-7" Id=115 BLOCKED on
> java.lang.Object@2baae51
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110)
> at
> org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
> at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
> at
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
> at
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
> at
> org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Number of Locked Synchronizers: 1
> - java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)