This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 97115f21bc NIFI-14959 fix consistent hash function for 
partition-by-attribute load balance strategy
97115f21bc is described below

commit 97115f21bc33190886a554e859404fe204f33a22
Author: Mike Moser <[email protected]>
AuthorDate: Mon Sep 22 20:11:38 2025 +0000

    NIFI-14959 fix consistent hash function for partition-by-attribute load 
balance strategy
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10336.
---
 .../partition/CorrelationAttributePartitioner.java       | 16 +++++++---------
 .../partition/CorrelationAttributePartitionerTest.java   | 12 +++++++++---
 2 files changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
index 6335446fbc..894b60e043 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
@@ -72,8 +72,13 @@ public class CorrelationAttributePartitioner implements 
FlowFilePartitioner {
         return false;
     }
 
+    /*
+     * Method implementation based on Google Guava 
com.google.common.hash.Hashing.consistentHash()
+     *
+     * Assigns an index number in the range [0 - (partitions-1)] with a 
uniform distribution across partitions
+     * while minimizing redistribution if the number of partitions changes.
+     */
     private int findIndex(final long hash, final int partitions) {
-        // Method implementation based on Google Guava 
com.google.common.hash.Hashing.consistentHash()
         final LinearCongruentialGenerator generator = new 
LinearCongruentialGenerator(hash);
         int candidate = 0;
 
@@ -84,14 +89,7 @@ public class CorrelationAttributePartitioner implements 
FlowFilePartitioner {
             if (next >= 0 && next < partitions) {
                 candidate = next;
             } else {
-                final int index;
-                if (candidate == 0) {
-                    index = candidate;
-                } else {
-                    // Adjust index when handling more than one partition
-                    index = candidate - INDEX_OFFSET;
-                }
-                return index;
+                return candidate;
             }
         }
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
index 6ac55bfd36..1a61868b81 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
@@ -34,9 +34,9 @@ import static org.mockito.Mockito.when;
 class CorrelationAttributePartitionerTest {
     private static final String PARTITIONING_ATTRIBUTE = "group";
 
-    private static final String FIRST_ATTRIBUTE = "1";
-
-    private static final String SECOND_ATTRIBUTE = "2";
+    private static final String FIRST_ATTRIBUTE = "4";  // value chosen so its 
hash places it in partition 1
+    private static final String SECOND_ATTRIBUTE = "1"; // value chosen so its 
hash places it in partition 2
+    private static final String THIRD_ATTRIBUTE = "2";  // value chosen so its 
hash places it in partition 3
 
     @Mock
     private FlowFileRecord flowFileRecord;
@@ -112,5 +112,11 @@ class CorrelationAttributePartitionerTest {
 
         final QueuePartition fourthSelected = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
         assertEquals(firstPartition, fourthSelected);
+
+        // Set Third Attribute for partitioning
+        
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(THIRD_ATTRIBUTE);
+
+        final QueuePartition fifthSelected = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+        assertEquals(thirdPartition, fifthSelected);
     }
 }

Reply via email to