[
https://issues.apache.org/jira/browse/NIFI-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224641#comment-15224641
]
ASF subversion and git services commented on NIFI-1684:
-------------------------------------------------------
Commit 89567ebfba3b6d4cf42b04aa981085b1f0e3564e in nifi's branch
refs/heads/support/nifi-0.6.x from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=89567eb ]
NIFI-1684 This closes #302. fixed random partitioner initialization
Signed-off-by: joewitt <[email protected]>
> PutKafka tuning past refactoring
> --------------------------------
>
> Key: NIFI-1684
> URL: https://issues.apache.org/jira/browse/NIFI-1684
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 0.6.0
> Reporter: Joseph Witt
> Assignee: Oleg Zhurakousky
> Fix For: 0.7.0, 0.6.1
>
>
> The Kafka broker and zookeeper instance being used were replaced while NiFi
> was running.
> During that time PutKafka had NullPointerExceptions such as:
> {quote}
> 2016-03-24 05:12:58,427 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
> PutKafka[id=f8b2f669-fec5-3b26-ad2b-bca
> dff0c6543] due to uncaught Exception: java.lang.NullPointerException
> 2016-03-24 05:12:58,429 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask
> java.lang.NullPointerException: null
> at java.lang.String.<init>(String.java:566) ~[na:1.8.0_65]
> at
> org.apache.nifi.processors.kafka.SplittableMessageContext.getKeyBytesAsString(SplittableMessageContext.java:105)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:395)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308)
> ~[na:na]
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.6.0.jar:0.6.0]
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
> ~[nifi-framework-core-0.6.0.jar:0.6.0]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.6.0.jar:0.6.0]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.6.0.jar:0.6.0]
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.6.0.jar:0.6.0]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_65]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_65]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_65]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_65]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_65]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_65]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
> {quote}
> But, GetKafka had no errors, stopped functioning, and became unresponsive to
> attempts to stop it. The 30 sec invoke quietly mechanism didn't seem to
> address the issue either presumably because the thread is stuck on some
> object monitor. I tried to stop it so I could restart it but in attempting
> to start it NiFi blocked me saying it was in STOPPING state. So stack dump
> taken and this appears relevant:
> {quote}
> "StandardProcessScheduler Thread-8" Id=142 BLOCKED on
> java.lang.Object@17820e22
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:295)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
> at
> org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:296)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
> at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
> at
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
> at
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
> at
> org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1330)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Number of Locked Synchronizers: 1
> - java.util.concurrent.ThreadPoolExecutor$Worker@1f4d2630
> {quote}
> Then here i see the reference to that object in another stack:
> {quote}
> "kafka-consumer-scheduler-0" Id=213 TIMED_WAITING on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@5e93adf
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326)
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303)
> - waiting on java.lang.Object@17820e22
> at
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:271)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:134)
> at
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
> at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Number of Locked Synchronizers: 1
> - java.util.concurrent.ThreadPoolExecutor$Worker@4a713b18
> {quote}
> Which is waiting on
> {quote}
> "9e31cdb4-9685-4b04-9164-1d737edd5f31_52.90.60.74-1458794505915-6302e1bb-leader-finder-thread"
> Id=233 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSyn
> chronizer$ConditionObject@5e93adf
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
> at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
> at
> kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:469)
> at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:81)
> at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> Number of Locked Synchronizers: 1
> - java.util.concurrent.locks.ReentrantLock$NonfairSync@50bc97c7
> {quote}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)