[
https://issues.apache.org/jira/browse/NIFI-1827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264569#comment-15264569
]
Joe Skora commented on NIFI-1827:
---------------------------------
[[email protected]], changing the "numberOfPartitions" definitely looks like
a possible cause. A threading race condition could also be a possible cause.
I always prefer
{code}
if (index <= numberOfPartitions) {"
{code}
for an increasing value check like that, it can prevent this kind of problem.
If the partitioning can be occasionally sloppy (A/B/C/A/A/B/C/D/A...), not a
strict round robin, that may be enough to prevent problems.
If the partitioning needs a clockwork-like rotation (A/B/C/D/E/A/B/...)
threading is an issue, you may need a more sophisticated data type for
{{_index_}} like {{AtomicInteger}} and/or protected check and set logic that
{{synchronized}} can provide.
> PutKafka attempts to write to non-existent partition.
> ------------------------------------------------------
>
> Key: NIFI-1827
> URL: https://issues.apache.org/jira/browse/NIFI-1827
> Project: Apache NiFi
> Issue Type: Bug
> Affects Versions: 0.7.0
> Reporter: Christopher McDermott
>
> PutKafka attempts to write to non-existent partition. I have not verified
> yet but I think the problem can be triggered by deleting a topic while the
> processors is running, and then recreating the topic with the same name.
> Since the problem has occurred I have not been able to make it go away. I've
> recreated the processor in the flow but the new processor exhibits the same
> behavior.
> 2016-04-29 12:00:53,550 ERROR [Timer-Driven Process Thread-1]
> o.apache.nifi.processors.kafka.PutKafka
> java.lang.IllegalArgumentException: Invalid partition given with record: 4 is
> not in the range [0...1].
> at
> org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
> ~[na:na]
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333)
> ~[na:na]
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.KafkaPublisher.toKafka(KafkaPublisher.java:203)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:137)
> ~[na:na]
> at
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:300)
> ~[na:na]
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
> ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
> ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:296)
> ~[na:na]
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
> ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_45]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_45]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_45]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_45]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_45]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_45]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)