I looked at the Kafka client code and it seemed to me to be a bug in the caller. There is a map passed that maps topics to number of consumers. In this case it asserting that the number of consumers is greater than zero. If I can repro the problem I'll try to isolate it in the debugger and provide more details.
Sent from my Verizon, Samsung Galaxy smartphone -------- Original message -------- From: Oleg Zhurakousky <[email protected]> Date: 4/14/16 4:14 PM (GMT-05:00) To: [email protected] Subject: Re: GetKafka blowing up with assertion error in Kafka client code Chris That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have. So here are more details. . . The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often) When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError). Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka. Could you share your config or whatever other details? Cheers Oleg > On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - > STaTS/StorefrontRemote) <[email protected]> wrote: > > I’m running based of of 0.7.0 Snapshot. The GetKafka config is pretty > generic. Batch size 1, 1 concurrent task. > > > 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] > o.apache.nifi.processors.kafka.GetKafka > java.lang.IllegalStateException: java.util.concurrent.ExecutionException: > java.lang.AssertionError: assertion failed > at > org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) > ~[na:na] > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) > [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) > [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) > [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_45] > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > [na:1.8.0_45] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > [na:1.8.0_45] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > [na:1.8.0_45] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_45] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_45] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] > Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: > assertion failed > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > [na:1.8.0_45] > at java.util.concurrent.FutureTask.get(FutureTask.java:206) > [na:1.8.0_45] > at > org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) > ~[na:na] > ... 12 common frames omitted > Caused by: java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) ~[na:na] > at > kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) > ~[na:na] > at > kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) > ~[na:na] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > ~[na:na] > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > ~[na:na] > at > kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) > ~[na:na] > at > kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) > ~[na:na] > at > kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) > ~[na:na] > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) > ~[na:na] > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) > ~[na:na] > at > org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) > ~[na:na] > at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) > ~[na:na] > at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) > ~[na:na] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_45] > ... 3 common frames omitted
