I looked at the Kafka client code and it seemed to me to be a bug in the 
caller. There is a map passed that maps topics to number of consumers. In this 
case it asserting that the number of consumers is greater than zero. If I can 
repro the problem I'll try to isolate it in the debugger and provide more 
details.



Sent from my Verizon, Samsung Galaxy smartphone


-------- Original message --------
From: Oleg Zhurakousky <[email protected]>
Date: 4/14/16 4:14 PM (GMT-05:00)
To: [email protected]
Subject: Re: GetKafka blowing up with assertion error in Kafka client code

Chris
That is correct and for a change I am pretty happy to see this stack trace as 
it clearly shows the problem and validates the approach we have.
So here are more details. . .

The root failure is in Kafka (as you can see from the stack trace). All we are 
doing is encapsulating interaction with Kafka into cancelable Future so we can 
cancel if and when Kafka deadlocks (which we noticed happens rather often)
When we execute Future.get() it results in ExecutionException which caries the 
original Kafka exception (AssertionError).
Now I am not sure what that assertion error really means in the context of what 
you are trying to do but its clearly a problem originated in Kafka.
Could you share your config or whatever other details?

Cheers
Oleg

> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - 
> STaTS/StorefrontRemote) <[email protected]> wrote:
>
> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty 
> generic.  Batch size 1, 1 concurrent task.
>
>
> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] 
> o.apache.nifi.processors.kafka.GetKafka
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
> java.lang.AssertionError: assertion failed
>        at 
> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) 
> ~[na:na]
>        at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>  ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_45]
>        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
> [na:1.8.0_45]
>        at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  [na:1.8.0_45]
>        at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  [na:1.8.0_45]
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_45]
>        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_45]
>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: 
> assertion failed
>        at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> [na:1.8.0_45]
>        at java.util.concurrent.FutureTask.get(FutureTask.java:206) 
> [na:1.8.0_45]
>        at 
> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) 
> ~[na:na]
>        ... 12 common frames omitted
> Caused by: java.lang.AssertionError: assertion failed
>        at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>        at 
> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51)
>  ~[na:na]
>        at 
> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49)
>  ~[na:na]
>        at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>  ~[na:na]
>        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>        at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>  ~[na:na]
>        at 
> kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) 
> ~[na:na]
>        at 
> kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113)
>  ~[na:na]
>        at 
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226)
>  ~[na:na]
>        at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
>  ~[na:na]
>        at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
>  ~[na:na]
>        at 
> org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) 
> ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) 
> ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) 
> ~[na:na]
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_45]
>        ... 3 common frames omitted

  • GetKafka blowing up... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
    • Re: GetKafka b... Oleg Zhurakousky
      • Re: GetKaf... Joe Witt
      • RE: GetKaf... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
        • Re: Ge... Oleg Zhurakousky
          • Re... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)
            • ... Oleg Zhurakousky
              • ... McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)

Reply via email to