Also, keep in mind that our RoundRobin will calculate the partition based on 
how many partitions available using the following code:

int partSize = this.producer.partitionsFor(topicName).size(); //(this.producer 
being KafkaProducer)

However, inside of Kafka internal Partitioner there is this code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
        if (record.partition() != null) {
            // they have given us a partition, use it
            if (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given 
with record: " + record.partition()
                                                   + " is not in the range 
[0..."
                                                   + numPartitions
                                                   + "].”);

So Kafka seem to have many different ways to get to the same number but it 
appears to me that the number is not the same. I’ll post the question on Kafka 
list and will let you know once I get a response.

Oleg

On Mar 28, 2016, at 4:19 PM, Oleg Zhurakousky 
<[email protected]<mailto:[email protected]>> wrote:

Need more information. It’s quite unfortunate that it all manifests itself 
through NiFi, but the real issue is with Kafka API and specifically this line 
in KafkaProducer

int partition = partitioner.partition(serializedRecord, metadata.fetch());

It uses some internal partitioner
    if (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given 
with record: " + record.partition()
                                                   + " is not in the range 
[0..."
                                                   + numPartitions
                                                   + "].");

so, when partition count doesn’t match the partition size it burns. So it 
actually has nothing to do with RoundRobin

Cheers
Oleg


On Mar 28, 2016, at 4:02 PM, McDermott, Chris Kevin (MSDU - 
STaTS/StorefrontRemote) 
<[email protected]<mailto:[email protected]>> wrote:

I’m running off of the HEAD of master (or at least close to it - e4b7e4.)  It 
looks like the processor has lost its mind with the respect to the number of 
partitions.
Granted my cluster has been in rough shape mostly through my own abuse and I 
probably caused the problem myself.

Has anyone seen this?  Any suggestion for getting out of this situation?

Thanks,

Chris

2016-03-28 19:41:07,902 ERROR [Timer-Driven Process Thread-5] 
o.apache.nifi.processors.kafka.PutKafka
java.lang.IllegalArgumentException: Invalid partition given with record: 1952 
is not in the range [0...1].
       at 
org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
 ~[na:na]
       at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333) 
~[na:na]
       at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) 
~[na:na]
       at 
org.apache.nifi.processors.kafka.KafkaPublisher.toKafka(KafkaPublisher.java:201)
 ~[na:na]
       at 
org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:134)
 ~[na:na]
       at 
org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299) ~[na:na]
       at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
 ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
 ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:296) ~[na:na]
       at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
 ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
       at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_45]
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_45]
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_45]
       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_45]
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_45]
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_45]
       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]


Reply via email to