Thanks Chris

Indeed let us know if/when/how to reproduce it so we can evaluate and see if it 
is something we can validate/handle in NiFi before it is passed to Kafka (e.g., 
validation etc)

Cheers
Oleg

> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - 
> STaTS/StorefrontRemote) <[email protected]> wrote:
> 
> I looked at the Kafka client code and it seemed to me to be a bug in the 
> caller. There is a map passed that maps topics to number of consumers. In 
> this case it asserting that the number of consumers is greater than zero. If 
> I can repro the problem I'll try to isolate it in the debugger and provide 
> more details.
> 
> 
> 
> Sent from my Verizon, Samsung Galaxy smartphone
> 
> 
> -------- Original message --------
> From: Oleg Zhurakousky <[email protected]>
> Date: 4/14/16 4:14 PM (GMT-05:00)
> To: [email protected]
> Subject: Re: GetKafka blowing up with assertion error in Kafka client code
> 
> Chris
> That is correct and for a change I am pretty happy to see this stack trace as 
> it clearly shows the problem and validates the approach we have.
> So here are more details. . .
> 
> The root failure is in Kafka (as you can see from the stack trace). All we 
> are doing is encapsulating interaction with Kafka into cancelable Future so 
> we can cancel if and when Kafka deadlocks (which we noticed happens rather 
> often)
> When we execute Future.get() it results in ExecutionException which caries 
> the original Kafka exception (AssertionError).
> Now I am not sure what that assertion error really means in the context of 
> what you are trying to do but its clearly a problem originated in Kafka.
> Could you share your config or whatever other details?
> 
> Cheers
> Oleg
> 
>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - 
>> STaTS/StorefrontRemote) <[email protected]> wrote:
>> 
>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty 
>> generic.  Batch size 1, 1 concurrent task.
>> 
>> 
>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] 
>> o.apache.nifi.processors.kafka.GetKafka
>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
>> java.lang.AssertionError: assertion failed
>>       at 
>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) 
>> ~[na:na]
>>       at 
>> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>>  ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at 
>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at 
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at 
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at 
>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
>> [na:1.8.0_45]
>>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
>> [na:1.8.0_45]
>>       at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>  [na:1.8.0_45]
>>       at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>  [na:1.8.0_45]
>>       at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  [na:1.8.0_45]
>>       at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  [na:1.8.0_45]
>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>> Caused by: java.util.concurrent.ExecutionException: 
>> java.lang.AssertionError: assertion failed
>>       at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
>> [na:1.8.0_45]
>>       at java.util.concurrent.FutureTask.get(FutureTask.java:206) 
>> [na:1.8.0_45]
>>       at 
>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) 
>> ~[na:na]
>>       ... 12 common frames omitted
>> Caused by: java.lang.AssertionError: assertion failed
>>       at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>       at 
>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51)
>>  ~[na:na]
>>       at 
>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49)
>>  ~[na:na]
>>       at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>  ~[na:na]
>>       at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>>       at 
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>  ~[na:na]
>>       at 
>> kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49)
>>  ~[na:na]
>>       at 
>> kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113)
>>  ~[na:na]
>>       at 
>> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226)
>>  ~[na:na]
>>       at 
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
>>  ~[na:na]
>>       at 
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
>>  ~[na:na]
>>       at 
>> org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) 
>> ~[na:na]
>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) 
>> ~[na:na]
>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) 
>> ~[na:na]
>>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
>> [na:1.8.0_45]
>>       ... 3 common frames omitted
> 
> 

Reply via email to