Oleg, I’m almost positive that this is a concurrency issue. When I dialed the
number of concurrent tasks in PutKafka up from 1 to 5 is when it started. I
didn’t realize this until I dialed it back to 1 to make debugging with the IDE
easier. When I did that the problem went away. When I cranked it back to 5
the problem restarted. Back to 1 and no more problem. I’m leaning toward
pointing the finger at the Kafka client code that returns the number of
partitions, as the NiFi code looks thread safe to me, although I think it could
be hardened a bit.
/**
* {@link Partitioner} that implements 'round-robin' mechanism which evenly
* distributes load between all available partitions.
*/
public static class RoundRobinPartitioner implements Partitioner {
private volatile int index;
@Override
public int partition(Object key, int numberOfPartitions) {
int partitionIndex = this.next(numberOfPartitions);
return partitionIndex;
}
private int next(int numberOfPartitions) {
if (index == numberOfPartitions) {
index = 0;
}
int indexToReturn = index++;
return indexToReturn;
}
}
On 3/28/16, 5:14 PM, "Oleg Zhurakousky" <[email protected]> wrote:
>Also, keep in mind that our RoundRobin will calculate the partition based on
>how many partitions available using the following code:
>
>int partSize = this.producer.partitionsFor(topicName).size(); //(this.producer
>being KafkaProducer)
>
>However, inside of Kafka internal Partitioner there is this code:
>
>List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
>int numPartitions = partitions.size();
> if (record.partition() != null) {
> // they have given us a partition, use it
> if (record.partition() < 0 || record.partition() >= numPartitions)
> throw new IllegalArgumentException("Invalid partition given
> with record: " + record.partition()
> + " is not in the range
> [0..."
> + numPartitions
> + "].”);
>
>So Kafka seem to have many different ways to get to the same number but it
>appears to me that the number is not the same. I’ll post the question on Kafka
>list and will let you know once I get a response.
>
>Oleg
>
>On Mar 28, 2016, at 4:19 PM, Oleg Zhurakousky
><[email protected]<mailto:[email protected]>> wrote:
>
>Need more information. It’s quite unfortunate that it all manifests itself
>through NiFi, but the real issue is with Kafka API and specifically this line
>in KafkaProducer
>
>int partition = partitioner.partition(serializedRecord, metadata.fetch());
>
>It uses some internal partitioner
> if (record.partition() < 0 || record.partition() >= numPartitions)
> throw new IllegalArgumentException("Invalid partition given
> with record: " + record.partition()
> + " is not in the range
> [0..."
> + numPartitions
> + "].");
>
>so, when partition count doesn’t match the partition size it burns. So it
>actually has nothing to do with RoundRobin
>
>Cheers
>Oleg
>
>
>On Mar 28, 2016, at 4:02 PM, McDermott, Chris Kevin (MSDU -
>STaTS/StorefrontRemote)
><[email protected]<mailto:[email protected]>> wrote:
>
>I’m running off of the HEAD of master (or at least close to it - e4b7e4.) It
>looks like the processor has lost its mind with the respect to the number of
>partitions.
>Granted my cluster has been in rough shape mostly through my own abuse and I
>probably caused the problem myself.
>
>Has anyone seen this? Any suggestion for getting out of this situation?
>
>Thanks,
>
>Chris
>
>2016-03-28 19:41:07,902 ERROR [Timer-Driven Process Thread-5]
>o.apache.nifi.processors.kafka.PutKafka
>java.lang.IllegalArgumentException: Invalid partition given with record: 1952
>is not in the range [0...1].
> at
> org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
> ~[na:na]
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333)
> ~[na:na]
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.KafkaPublisher.toKafka(KafkaPublisher.java:201)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:134)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299)
> ~[na:na]
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
> ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
> ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
> at
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:296)
> ~[na:na]
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
> ~[nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.6.0-SNAPSHOT.jar:0.6.0-SNAPSHOT]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_45]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_45]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_45]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_45]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_45]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_45]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>
>