Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3010#discussion_r219297604
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PReturns remote partitions when queried for a partition; never returns
the {@link LocalQueuePartition}.
+ */
+public class NonLocalPartitionPartitioner implements FlowFilePartitioner {
+ private final AtomicLong counter = new AtomicLong(0L);
+
+ @Override
+ public QueuePartition getPartition(final FlowFileRecord flowFile,
final QueuePartition[] partitions, final QueuePartition localPartition) {
+ QueuePartition remotePartition = null;
+ for (int i = 0, numPartitions = partitions.length; i <
numPartitions; i++) {
+ final long count = counter.getAndIncrement();
--- End diff --
I think the logic here is just a bit off, because if we have say 5
partitions but many threads, it could be the case that the first iteration gets
count = 3 and that's the local partition. The next iteration could get count =
8 so index = 3, which is the local partition. The next iteration gets count =
13 so index = 3, then count = 18, then count = 23. At this point we'd have
pointed to the local partition 5 times in a row and thrown
IllegalStateException.
Instead, I think we should do something like:
{code}
final long startIndex = counter.getAndIncrement();
for (int i=0; i < partitions.length; i++) {
final int index = (int) (startIndex + i) % partitions.length;
final QueuePartition partition = partitions[index];
...
}
{code}
This way, we still get the round-robin-ing effect but ensure that we try
all partitions before throwing an IllegalStateException.
---