[ 
https://issues.apache.org/jira/browse/NIFI-5194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475576#comment-16475576
 ] 

ASF subversion and git services commented on NIFI-5194:
-------------------------------------------------------

Commit 620d44686041a24ce03eb5d864b128c0e61e9dc5 in nifi's branch 
refs/heads/master from [~markap14]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=620d446 ]

NIFI-5194: Ensure that even if calling KafkaConsumer.resume() throws an 
Exception, that we still release the lock that we are holding

Signed-off-by: Pierre Villard <[email protected]>

This closes #2701.


> ConsumeKafka processor can occasionally hang
> --------------------------------------------
>
>                 Key: NIFI-5194
>                 URL: https://issues.apache.org/jira/browse/NIFI-5194
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>             Fix For: 1.7.0
>
>
> When using ConsumeKafka, we can occasionally get into a situation where the 
> threads will "hang" and stopping the processor won't help. In such a case, we 
> can see the following stack trace in thread dumps:
> {code:java}
> "Timer-Driven Process Thread-42" Id=201 WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@502555e6
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at 
> org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:147)
> at 
> org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.onTrigger(ConsumeKafka.java:367)
> at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Number of Locked Synchronizers: 1
> - java.util.concurrent.ThreadPoolExecutor$Worker@6759e7f6
> {code}
>  
> Note that this issue affects only ConsumeKafka and not ConsumeKafka_0_10, 
> ConsumeKafka_0_11, ConsumeKafka_1_0, or any ConsumeKafkaRecord* processors. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to