Chris Thanks for looking into this and describing the problem. Indeed we have seen similar symptoms but would need to further investigate and see if there is an option to stop the internal to Kafka reconnect thread. It appears there are configuration properties in the new API to do that, while I am not sure about the old at the moment. As I said, will investigate further and let you know
Thanks again for looking into this Oleg Sent from my iPhone > On Apr 28, 2016, at 18:41, McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote) <[email protected]> wrote: > > Oleg, > > I have reproduced the problem. Its pretty easy to do. Just delete and > recreate the topic while the processor is running. I think I saw a similar > problem when I increased the partitions in the topic. That problem resolved > itself when I restarted the GetKafka processors. However, to resolve this > problem restarting the processor does not work. It must be that something is > being stored in Zookeeper. I am guessing that deleting and recreating the > processor will do the trick. Is there any debugging information which I can > provide to you? > > Thanks, > Chris > > > >> On 4/14/16, 8:32 PM, "Oleg Zhurakousky" <[email protected]> wrote: >> >> Thanks Chris >> >> Indeed let us know if/when/how to reproduce it so we can evaluate and see if >> it is something we can validate/handle in NiFi before it is passed to Kafka >> (e.g., validation etc) >> >> Cheers >> Oleg >> >>> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - >>> STaTS/StorefrontRemote) <[email protected]> wrote: >>> >>> I looked at the Kafka client code and it seemed to me to be a bug in the >>> caller. There is a map passed that maps topics to number of consumers. In >>> this case it asserting that the number of consumers is greater than zero. >>> If I can repro the problem I'll try to isolate it in the debugger and >>> provide more details. >>> >>> >>> >>> Sent from my Verizon, Samsung Galaxy smartphone >>> >>> >>> -------- Original message -------- >>> From: Oleg Zhurakousky <[email protected]> >>> Date: 4/14/16 4:14 PM (GMT-05:00) >>> To: [email protected] >>> Subject: Re: GetKafka blowing up with assertion error in Kafka client code >>> >>> Chris >>> That is correct and for a change I am pretty happy to see this stack trace >>> as it clearly shows the problem and validates the approach we have. >>> So here are more details. . . >>> >>> The root failure is in Kafka (as you can see from the stack trace). All we >>> are doing is encapsulating interaction with Kafka into cancelable Future so >>> we can cancel if and when Kafka deadlocks (which we noticed happens rather >>> often) >>> When we execute Future.get() it results in ExecutionException which caries >>> the original Kafka exception (AssertionError). >>> Now I am not sure what that assertion error really means in the context of >>> what you are trying to do but its clearly a problem originated in Kafka. >>> Could you share your config or whatever other details? >>> >>> Cheers >>> Oleg >>> >>>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - >>>> STaTS/StorefrontRemote) <[email protected]> wrote: >>>> >>>> I’m running based of of 0.7.0 Snapshot. The GetKafka config is pretty >>>> generic. Batch size 1, 1 concurrent task. >>>> >>>> >>>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] >>>> o.apache.nifi.processors.kafka.GetKafka >>>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: >>>> java.lang.AssertionError: assertion failed >>>> at >>>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) >>>> ~[na:na] >>>> at >>>> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) >>>> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>> at >>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) >>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>> at >>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) >>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>> at >>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) >>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>> at >>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) >>>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> [na:1.8.0_45] >>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>> [na:1.8.0_45] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>> [na:1.8.0_45] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>> [na:1.8.0_45] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> [na:1.8.0_45] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> [na:1.8.0_45] >>>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] >>>> Caused by: java.util.concurrent.ExecutionException: >>>> java.lang.AssertionError: assertion failed >>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>> [na:1.8.0_45] >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:206) >>>> [na:1.8.0_45] >>>> at >>>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) >>>> ~[na:na] >>>> ... 12 common frames omitted >>>> Caused by: java.lang.AssertionError: assertion failed >>>> at scala.Predef$.assert(Predef.scala:165) ~[na:na] >>>> at >>>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) >>>> ~[na:na] >>>> at >>>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) >>>> ~[na:na] >>>> at >>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>> ~[na:na] >>>> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na] >>>> at >>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>> ~[na:na] >>>> at >>>> kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) >>>> ~[na:na] >>>> at >>>> kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) >>>> ~[na:na] >>>> at >>>> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) >>>> ~[na:na] >>>> at >>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) >>>> ~[na:na] >>>> at >>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) >>>> ~[na:na] >>>> at >>>> org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) >>>> ~[na:na] >>>> at >>>> org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) >>>> ~[na:na] >>>> at >>>> org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) >>>> ~[na:na] >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> [na:1.8.0_45] >>>> ... 3 common frames omitted >>
