Thanks Chris Indeed let us know if/when/how to reproduce it so we can evaluate and see if it is something we can validate/handle in NiFi before it is passed to Kafka (e.g., validation etc)
Cheers Oleg > On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote) <[email protected]> wrote: > > I looked at the Kafka client code and it seemed to me to be a bug in the > caller. There is a map passed that maps topics to number of consumers. In > this case it asserting that the number of consumers is greater than zero. If > I can repro the problem I'll try to isolate it in the debugger and provide > more details. > > > > Sent from my Verizon, Samsung Galaxy smartphone > > > -------- Original message -------- > From: Oleg Zhurakousky <[email protected]> > Date: 4/14/16 4:14 PM (GMT-05:00) > To: [email protected] > Subject: Re: GetKafka blowing up with assertion error in Kafka client code > > Chris > That is correct and for a change I am pretty happy to see this stack trace as > it clearly shows the problem and validates the approach we have. > So here are more details. . . > > The root failure is in Kafka (as you can see from the stack trace). All we > are doing is encapsulating interaction with Kafka into cancelable Future so > we can cancel if and when Kafka deadlocks (which we noticed happens rather > often) > When we execute Future.get() it results in ExecutionException which caries > the original Kafka exception (AssertionError). > Now I am not sure what that assertion error really means in the context of > what you are trying to do but its clearly a problem originated in Kafka. > Could you share your config or whatever other details? > > Cheers > Oleg > >> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - >> STaTS/StorefrontRemote) <[email protected]> wrote: >> >> I’m running based of of 0.7.0 Snapshot. The GetKafka config is pretty >> generic. Batch size 1, 1 concurrent task. >> >> >> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] >> o.apache.nifi.processors.kafka.GetKafka >> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: >> java.lang.AssertionError: assertion failed >> at >> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) >> ~[na:na] >> at >> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) >> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> at >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> at >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> [na:1.8.0_45] >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> [na:1.8.0_45] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> [na:1.8.0_45] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> [na:1.8.0_45] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> [na:1.8.0_45] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> [na:1.8.0_45] >> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.AssertionError: assertion failed >> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> [na:1.8.0_45] >> at java.util.concurrent.FutureTask.get(FutureTask.java:206) >> [na:1.8.0_45] >> at >> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) >> ~[na:na] >> ... 12 common frames omitted >> Caused by: java.lang.AssertionError: assertion failed >> at scala.Predef$.assert(Predef.scala:165) ~[na:na] >> at >> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) >> ~[na:na] >> at >> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) >> ~[na:na] >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> ~[na:na] >> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na] >> at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> ~[na:na] >> at >> kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) >> ~[na:na] >> at >> kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) >> ~[na:na] >> at >> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) >> ~[na:na] >> at >> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) >> ~[na:na] >> at >> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) >> ~[na:na] >> at >> org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) >> ~[na:na] >> at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) >> ~[na:na] >> at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) >> ~[na:na] >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> [na:1.8.0_45] >> ... 3 common frames omitted > >
