[ 
https://issues.apache.org/jira/browse/NIFI-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209784#comment-15209784
 ] 

Joseph Witt edited comment on NIFI-1684 at 3/24/16 5:35 AM:
------------------------------------------------------------

i should add that in the case of PutKafka once the broker came back up things 
automatically restored themselves and got moving again.  While it was down and 
these NPEs were hitting it administratively yielded as desired.  So really just 
a matter of better handing the case and giving the user something other than an 
NPE to ponder.

For GetKafka though the behavior is more problematic.  Given where these issues 
appear to live unclear what we can do for such a case.


was (Author: joewitt):
i should add that once the broker came back up things automatically restored 
themselves and got moving again.  While it was down and these NPEs were hitting 
it administratively yielded as desired.  So really just a matter of better 
handing the case and giving the user something other than an NPE to ponder.

> When the Kafka broker and Zookeeper instance used are replaced behavior is 
> poor
> -------------------------------------------------------------------------------
>
>                 Key: NIFI-1684
>                 URL: https://issues.apache.org/jira/browse/NIFI-1684
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 0.6.0
>            Reporter: Joseph Witt
>
> The Kafka broker and zookeeper instance being used were replaced while NiFi 
> was running.
> During that time PutKafka had NullPointerExceptions such as:
> {quote}
> 2016-03-24 05:12:58,427 WARN [Timer-Driven Process Thread-5] 
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
> PutKafka[id=f8b2f669-fec5-3b26-ad2b-bca
> dff0c6543] due to uncaught Exception: java.lang.NullPointerException
> 2016-03-24 05:12:58,429 WARN [Timer-Driven Process Thread-5] 
> o.a.n.c.t.ContinuallyRunProcessorTask
> java.lang.NullPointerException: null
>         at java.lang.String.<init>(String.java:566) ~[na:1.8.0_65]
>         at 
> org.apache.nifi.processors.kafka.SplittableMessageContext.getKeyBytesAsString(SplittableMessageContext.java:105)
>  ~[na:na]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:395)
>  ~[na:na]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) 
> ~[na:na]
>         at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>  ~[nifi-api-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
>  ~[nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_65]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
> [na:1.8.0_65]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_65]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
> {quote}
> But, GetKafka had no errors, stopped functioning, and became unresponsive to 
> attempts to stop it.  The 30 sec invoke quietly mechanism didn't seem to 
> address the issue either presumably because the thread is stuck on some 
> object monitor.  I tried to stop it so I could restart it but in attempting 
> to start it NiFi blocked me saying it was in STOPPING state.  So stack dump 
> taken and this appears relevant:
> {quote}
> "StandardProcessScheduler Thread-8" Id=142 BLOCKED  on 
> java.lang.Object@17820e22
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:295)
>         at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
>         at 
> org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:296)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
>         at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
>         at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
>         at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
>         at 
> org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1330)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@1f4d2630
> {quote}
> Then here i see the reference to that object in another stack:
> {quote}
> "kafka-consumer-scheduler-0" Id=213 TIMED_WAITING  on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at 
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>         at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326)
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303)
>         - waiting on java.lang.Object@17820e22
>         at 
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:271)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:134)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
>         at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@4a713b18
> {quote}
> Which is waiting on 
> {quote}
> "9e31cdb4-9685-4b04-9164-1d737edd5f31_52.90.60.74-1458794505915-6302e1bb-leader-finder-thread"
>  Id=233 TIMED_WAITING  on java.util.concurrent.locks.AbstractQueuedSyn
> chronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at 
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>         at 
> kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:469)
>         at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:81)
>         at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.locks.ReentrantLock$NonfairSync@50bc97c7
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to