I’ve created a JIRA[1] for this. Chris
[1] https://issues.apache.org/jira/browse/NIFI-1827 On 4/28/16, 1:36 PM, "Oleg Zhurakousky" <[email protected]> wrote: >Chris > >Thanks for looking into this and describing the problem. Indeed we have seen >similar symptoms but would need to further investigate and see if there is an >option to stop the internal to Kafka reconnect thread. It appears there are >configuration properties in the new API to do that, while I am not sure about >the old at the moment. >As I said, will investigate further and let you know > >Thanks again for looking into this > >Oleg > >Sent from my iPhone > >> On Apr 28, 2016, at 18:41, McDermott, Chris Kevin (MSDU - >> STaTS/StorefrontRemote) <[email protected]> wrote: >> >> Oleg, >> >> I have reproduced the problem. Its pretty easy to do. Just delete and >> recreate the topic while the processor is running. I think I saw a similar >> problem when I increased the partitions in the topic. That problem resolved >> itself when I restarted the GetKafka processors. However, to resolve this >> problem restarting the processor does not work. It must be that something is >> being stored in Zookeeper. I am guessing that deleting and recreating the >> processor will do the trick. Is there any debugging information which I can >> provide to you? >> >> Thanks, >> Chris >> >> >> >>> On 4/14/16, 8:32 PM, "Oleg Zhurakousky" <[email protected]> >>> wrote: >>> >>> Thanks Chris >>> >>> Indeed let us know if/when/how to reproduce it so we can evaluate and see >>> if it is something we can validate/handle in NiFi before it is passed to >>> Kafka (e.g., validation etc) >>> >>> Cheers >>> Oleg >>> >>>> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - >>>> STaTS/StorefrontRemote) <[email protected]> wrote: >>>> >>>> I looked at the Kafka client code and it seemed to me to be a bug in the >>>> caller. There is a map passed that maps topics to number of consumers. In >>>> this case it asserting that the number of consumers is greater than zero. >>>> If I can repro the problem I'll try to isolate it in the debugger and >>>> provide more details. >>>> >>>> >>>> >>>> Sent from my Verizon, Samsung Galaxy smartphone >>>> >>>> >>>> -------- Original message -------- >>>> From: Oleg Zhurakousky <[email protected]> >>>> Date: 4/14/16 4:14 PM (GMT-05:00) >>>> To: [email protected] >>>> Subject: Re: GetKafka blowing up with assertion error in Kafka client code >>>> >>>> Chris >>>> That is correct and for a change I am pretty happy to see this stack trace >>>> as it clearly shows the problem and validates the approach we have. >>>> So here are more details. . . >>>> >>>> The root failure is in Kafka (as you can see from the stack trace). All we >>>> are doing is encapsulating interaction with Kafka into cancelable Future >>>> so we can cancel if and when Kafka deadlocks (which we noticed happens >>>> rather often) >>>> When we execute Future.get() it results in ExecutionException which caries >>>> the original Kafka exception (AssertionError). >>>> Now I am not sure what that assertion error really means in the context of >>>> what you are trying to do but its clearly a problem originated in Kafka. >>>> Could you share your config or whatever other details? >>>> >>>> Cheers >>>> Oleg >>>> >>>>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - >>>>> STaTS/StorefrontRemote) <[email protected]> wrote: >>>>> >>>>> I’m running based of of 0.7.0 Snapshot. The GetKafka config is pretty >>>>> generic. Batch size 1, 1 concurrent task. >>>>> >>>>> >>>>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] >>>>> o.apache.nifi.processors.kafka.GetKafka >>>>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: >>>>> java.lang.AssertionError: assertion failed >>>>> at >>>>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) >>>>> ~[na:na] >>>>> at >>>>> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) >>>>> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>>> at >>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) >>>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>>> at >>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) >>>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>>> at >>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) >>>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>>> at >>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) >>>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> [na:1.8.0_45] >>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>>> [na:1.8.0_45] >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>>> [na:1.8.0_45] >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>>> [na:1.8.0_45] >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> [na:1.8.0_45] >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> [na:1.8.0_45] >>>>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] >>>>> Caused by: java.util.concurrent.ExecutionException: >>>>> java.lang.AssertionError: assertion failed >>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>>> [na:1.8.0_45] >>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:206) >>>>> [na:1.8.0_45] >>>>> at >>>>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) >>>>> ~[na:na] >>>>> ... 12 common frames omitted >>>>> Caused by: java.lang.AssertionError: assertion failed >>>>> at scala.Predef$.assert(Predef.scala:165) ~[na:na] >>>>> at >>>>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) >>>>> ~[na:na] >>>>> at >>>>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) >>>>> ~[na:na] >>>>> at >>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>>> ~[na:na] >>>>> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) >>>>> ~[na:na] >>>>> at >>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>>> ~[na:na] >>>>> at >>>>> kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) >>>>> ~[na:na] >>>>> at >>>>> kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) >>>>> ~[na:na] >>>>> at >>>>> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) >>>>> ~[na:na] >>>>> at >>>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) >>>>> ~[na:na] >>>>> at >>>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) >>>>> ~[na:na] >>>>> at >>>>> org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) >>>>> ~[na:na] >>>>> at >>>>> org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) >>>>> ~[na:na] >>>>> at >>>>> org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) >>>>> ~[na:na] >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> [na:1.8.0_45] >>>>> ... 3 common frames omitted >>>
