I’ve created a JIRA[1] for this.  

Chris

[1] https://issues.apache.org/jira/browse/NIFI-1827





On 4/28/16, 1:36 PM, "Oleg Zhurakousky" <[email protected]> wrote:

>Chris
>
>Thanks for looking into this and describing the problem. Indeed we have seen 
>similar symptoms but would need to further investigate and see if there is an 
>option to stop the internal to Kafka reconnect thread. It appears there are 
>configuration properties in the new API to do that, while I am not sure about 
>the old at the moment. 
>As I said, will investigate further and let you know 
>
>Thanks again for looking into this
>
>Oleg 
>
>Sent from my iPhone
>
>> On Apr 28, 2016, at 18:41, McDermott, Chris Kevin (MSDU - 
>> STaTS/StorefrontRemote) <[email protected]> wrote:
>> 
>> Oleg,
>> 
>> I have reproduced the problem.  Its pretty easy to do. Just delete and 
>> recreate the topic while the processor is running.  I think I saw a similar 
>> problem when I increased the partitions in the topic.  That problem resolved 
>> itself when I restarted the GetKafka processors.  However, to resolve this 
>> problem restarting the processor does not work. It must be that something is 
>> being stored in Zookeeper.  I am guessing that deleting and recreating the 
>> processor will do the trick.  Is there any debugging information which I can 
>> provide to you?
>> 
>> Thanks,
>> Chris
>> 
>> 
>> 
>>> On 4/14/16, 8:32 PM, "Oleg Zhurakousky" <[email protected]> 
>>> wrote:
>>> 
>>> Thanks Chris
>>> 
>>> Indeed let us know if/when/how to reproduce it so we can evaluate and see 
>>> if it is something we can validate/handle in NiFi before it is passed to 
>>> Kafka (e.g., validation etc)
>>> 
>>> Cheers
>>> Oleg
>>> 
>>>> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - 
>>>> STaTS/StorefrontRemote) <[email protected]> wrote:
>>>> 
>>>> I looked at the Kafka client code and it seemed to me to be a bug in the 
>>>> caller. There is a map passed that maps topics to number of consumers. In 
>>>> this case it asserting that the number of consumers is greater than zero. 
>>>> If I can repro the problem I'll try to isolate it in the debugger and 
>>>> provide more details.
>>>> 
>>>> 
>>>> 
>>>> Sent from my Verizon, Samsung Galaxy smartphone
>>>> 
>>>> 
>>>> -------- Original message --------
>>>> From: Oleg Zhurakousky <[email protected]>
>>>> Date: 4/14/16 4:14 PM (GMT-05:00)
>>>> To: [email protected]
>>>> Subject: Re: GetKafka blowing up with assertion error in Kafka client code
>>>> 
>>>> Chris
>>>> That is correct and for a change I am pretty happy to see this stack trace 
>>>> as it clearly shows the problem and validates the approach we have.
>>>> So here are more details. . .
>>>> 
>>>> The root failure is in Kafka (as you can see from the stack trace). All we 
>>>> are doing is encapsulating interaction with Kafka into cancelable Future 
>>>> so we can cancel if and when Kafka deadlocks (which we noticed happens 
>>>> rather often)
>>>> When we execute Future.get() it results in ExecutionException which caries 
>>>> the original Kafka exception (AssertionError).
>>>> Now I am not sure what that assertion error really means in the context of 
>>>> what you are trying to do but its clearly a problem originated in Kafka.
>>>> Could you share your config or whatever other details?
>>>> 
>>>> Cheers
>>>> Oleg
>>>> 
>>>>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - 
>>>>> STaTS/StorefrontRemote) <[email protected]> wrote:
>>>>> 
>>>>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty 
>>>>> generic.  Batch size 1, 1 concurrent task.
>>>>> 
>>>>> 
>>>>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] 
>>>>> o.apache.nifi.processors.kafka.GetKafka
>>>>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
>>>>> java.lang.AssertionError: assertion failed
>>>>>      at 
>>>>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) 
>>>>> ~[na:na]
>>>>>      at 
>>>>> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>>>>>  ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at 
>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
>>>>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at 
>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>>>>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at 
>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>>>>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at 
>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>>>>>  [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at 
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
>>>>> [na:1.8.0_45]
>>>>>      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
>>>>> [na:1.8.0_45]
>>>>>      at 
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>  [na:1.8.0_45]
>>>>>      at 
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>  [na:1.8.0_45]
>>>>>      at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>  [na:1.8.0_45]
>>>>>      at 
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>  [na:1.8.0_45]
>>>>>      at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>>>> Caused by: java.util.concurrent.ExecutionException: 
>>>>> java.lang.AssertionError: assertion failed
>>>>>      at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
>>>>> [na:1.8.0_45]
>>>>>      at java.util.concurrent.FutureTask.get(FutureTask.java:206) 
>>>>> [na:1.8.0_45]
>>>>>      at 
>>>>> org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) 
>>>>> ~[na:na]
>>>>>      ... 12 common frames omitted
>>>>> Caused by: java.lang.AssertionError: assertion failed
>>>>>      at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>>>>      at 
>>>>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>>>  ~[na:na]
>>>>>      at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
>>>>> ~[na:na]
>>>>>      at 
>>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281)
>>>>>  ~[na:na]
>>>>>      at 
>>>>> org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) 
>>>>> ~[na:na]
>>>>>      at 
>>>>> org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) 
>>>>> ~[na:na]
>>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
>>>>> [na:1.8.0_45]
>>>>>      ... 3 common frames omitted
>>> 

Reply via email to