Also, keep in mind that our RoundRobin will calculate the partition based on
how many partitions available using the following code:
int partSize = this.producer.partitionsFor(topicName).size(); //(this.producer
being KafkaProducer)
However, inside of Kafka internal Partitioner there is this code:
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
if (record.partition() != null) {
// they have given us a partition, use it
if (record.partition() < 0 || record.partition() >= numPartitions)
throw new IllegalArgumentException("Invalid partition given
with record: " + record.partition()
+ " is not in the range
[0..."
+ numPartitions
+ "].”);
So Kafka seem to have many different ways to get to the same number but it
appears to me that the number is not the same. I’ll post the question on Kafka
list and will let you know once I get a response.
Oleg
On Mar 28, 2016, at 4:19 PM, Oleg Zhurakousky
<[email protected]<mailto:[email protected]>> wrote:
Need more information. It’s quite unfortunate that it all manifests itself
through NiFi, but the real issue is with Kafka API and specifically this line
in KafkaProducer
int partition = partitioner.partition(serializedRecord, metadata.fetch());
It uses some internal partitioner
if (record.partition() < 0 || record.partition() >= numPartitions)
throw new IllegalArgumentException("Invalid partition given
with record: " + record.partition()
+ " is not in the range
[0..."
+ numPartitions
+ "].");
so, when partition count doesn’t match the partition size it burns. So it
actually has nothing to do with RoundRobin
Cheers
Oleg
On Mar 28, 2016, at 4:02 PM, McDermott, Chris Kevin (MSDU -
STaTS/StorefrontRemote)
<[email protected]<mailto:[email protected]>> wrote:
I’m running off of the HEAD of master (or at least close to it - e4b7e4.) It
looks like the processor has lost its mind with the respect to the number of
partitions.
Granted my cluster has been in rough shape mostly through my own abuse and I
probably caused the problem myself.
Has anyone seen this? Any suggestion for getting out of this situation?
Thanks,
Chris
2016-03-28 19:41:07,902 ERROR [Timer-Driven Process Thread-5]
o.apache.nifi.processors.kafka.PutKafka
java.lang.IllegalArgumentException: Invalid partition given with record: 1952
is not in the range [0...1].
at
org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
~[na:na]
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333)
~[na:na]
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248)
~[na:na]
at
org.apache.nifi.processors.kafka.KafkaPublisher.toKafka(KafkaPublisher.java:201)
~[na:na]
at
org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:134)
~[na:na]
at
org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299) ~[na:na]
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
at
org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:296) ~[na:na]
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_45]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[na:1.8.0_45]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_45]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_45]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_45]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_45]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]