This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 97115f21bc NIFI-14959 fix consistent hash function for
partition-by-attribute load balance strategy
97115f21bc is described below
commit 97115f21bc33190886a554e859404fe204f33a22
Author: Mike Moser <[email protected]>
AuthorDate: Mon Sep 22 20:11:38 2025 +0000
NIFI-14959 fix consistent hash function for partition-by-attribute load
balance strategy
Signed-off-by: Pierre Villard <[email protected]>
This closes #10336.
---
.../partition/CorrelationAttributePartitioner.java | 16 +++++++---------
.../partition/CorrelationAttributePartitionerTest.java | 12 +++++++++---
2 files changed, 16 insertions(+), 12 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
index 6335446fbc..894b60e043 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
@@ -72,8 +72,13 @@ public class CorrelationAttributePartitioner implements
FlowFilePartitioner {
return false;
}
+ /*
+ * Method implementation based on Google Guava
com.google.common.hash.Hashing.consistentHash()
+ *
+ * Assigns an index number in the range [0 - (partitions-1)] with a
uniform distribution across partitions
+ * while minimizing redistribution if the number of partitions changes.
+ */
private int findIndex(final long hash, final int partitions) {
- // Method implementation based on Google Guava
com.google.common.hash.Hashing.consistentHash()
final LinearCongruentialGenerator generator = new
LinearCongruentialGenerator(hash);
int candidate = 0;
@@ -84,14 +89,7 @@ public class CorrelationAttributePartitioner implements
FlowFilePartitioner {
if (next >= 0 && next < partitions) {
candidate = next;
} else {
- final int index;
- if (candidate == 0) {
- index = candidate;
- } else {
- // Adjust index when handling more than one partition
- index = candidate - INDEX_OFFSET;
- }
- return index;
+ return candidate;
}
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
index 6ac55bfd36..1a61868b81 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
@@ -34,9 +34,9 @@ import static org.mockito.Mockito.when;
class CorrelationAttributePartitionerTest {
private static final String PARTITIONING_ATTRIBUTE = "group";
- private static final String FIRST_ATTRIBUTE = "1";
-
- private static final String SECOND_ATTRIBUTE = "2";
+ private static final String FIRST_ATTRIBUTE = "4"; // value chosen so its
hash places it in partition 1
+ private static final String SECOND_ATTRIBUTE = "1"; // value chosen so its
hash places it in partition 2
+ private static final String THIRD_ATTRIBUTE = "2"; // value chosen so its
hash places it in partition 3
@Mock
private FlowFileRecord flowFileRecord;
@@ -112,5 +112,11 @@ class CorrelationAttributePartitionerTest {
final QueuePartition fourthSelected =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
assertEquals(firstPartition, fourthSelected);
+
+ // Set Third Attribute for partitioning
+
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(THIRD_ATTRIBUTE);
+
+ final QueuePartition fifthSelected =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+ assertEquals(thirdPartition, fifthSelected);
}
}