Need more information. It’s quite unfortunate that it all manifests itself 
through NiFi, but the real issue is with Kafka API and specifically this line 
in KafkaProducer

int partition = partitioner.partition(serializedRecord, metadata.fetch());

It uses some internal partitioner
    if (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given 
with record: " + record.partition()
                                                   + " is not in the range 
[0..."
                                                   + numPartitions
                                                   + "].");

so, when partition count doesn’t match the partition size it burns. So it 
actually has nothing to do with RoundRobin

Cheers
Oleg


On Mar 28, 2016, at 4:02 PM, McDermott, Chris Kevin (MSDU - 
STaTS/StorefrontRemote) 
<chris.mcderm...@hpe.com<mailto:chris.mcderm...@hpe.com>> wrote:

I’m running off of the HEAD of master (or at least close to it - e4b7e4.)  It 
looks like the processor has lost its mind with the respect to the number of 
partitions.
Granted my cluster has been in rough shape mostly through my own abuse and I 
probably caused the problem myself.

Has anyone seen this?  Any suggestion for getting out of this situation?

Thanks,

Chris

2016-03-28 19:41:07,902 ERROR [Timer-Driven Process Thread-5] 
o.apache.nifi.processors.kafka.PutKafka
java.lang.IllegalArgumentException: Invalid partition given with record: 1952 
is not in the range [0...1].
       at 
org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
 ~[na:na]
       at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333) 
~[na:na]
       at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) 
~[na:na]
       at 
org.apache.nifi.processors.kafka.KafkaPublisher.toKafka(KafkaPublisher.java:201)
 ~[na:na]
       at 
org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:134)
 ~[na:na]
       at 
org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299) ~[na:na]
       at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
 ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
 ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:296) ~[na:na]
       at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
 ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_45]
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_45]
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_45]
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_45]
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_45]
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_45]
       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

Reply via email to