This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b84dd475e NIFI-14236 Implement consistent hash for Attribute 
Partitioner (#9709)
9b84dd475e is described below

commit 9b84dd475ee6561b2fdf2b0c95c2c182ed3f0fb9
Author: David Handermann <[email protected]>
AuthorDate: Mon Feb 10 12:19:57 2025 -0600

    NIFI-14236 Implement consistent hash for Attribute Partitioner (#9709)
    
    * NIFI-14236 Implemented consistent hash for Attribute Partitioner
    - Used implementation from com.google.common.hash.Hashing.consistentHash 
method
---
 nifi-assembly/NOTICE                               |   7 ++
 .../src/main/resources/META-INF/NOTICE             |   7 ++
 .../partition/CorrelationAttributePartitioner.java |  42 +++++++-
 .../CorrelationAttributePartitionerTest.java       | 116 +++++++++++++++++++++
 4 files changed, 169 insertions(+), 3 deletions(-)

diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index c7732beba6..8f85ffe722 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -69,6 +69,13 @@ This includes derived works from Dropwizard Metrics 
available under Apache Softw
     
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
     
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java
 
+This product includes derived works from Google Guava (ASLv2 licensed)
+  Copyright 2011 The Guava Authors
+  The derived work is adapted from the class com.google.common.hash.Hashing:
+    
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Hashing.java
+  The derived work can be found in:
+    
org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner
+
 ===========================================
 Apache Software License v2
 ===========================================
diff --git 
a/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE 
b/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
index 01f6bc95e4..c1c3d8b951 100644
--- 
a/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
@@ -4,6 +4,13 @@ Copyright 2014-2024 The Apache Software Foundation
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
 
+This product includes derived works from Google Guava (ASLv2 licensed)
+  Copyright 2011 The Guava Authors
+  The derived work is adapted from the class com.google.common.hash.Hashing:
+    
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Hashing.java
+  The derived work can be found in:
+    
org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner
+
 ******************
 Apache Software License v2
 ******************
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
index 54f01e61d5..6335446fbc 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
@@ -24,9 +24,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 
 public class CorrelationAttributePartitioner implements FlowFilePartitioner {
+    private static final int INDEX_OFFSET = 1;
+
+    // Multiplier from 
com.google.common.hash.Hashing.LinearCongruentialGenerator
+    private static final long LCG_MULTIPLIER = 2862933555777941757L;
+
     private static final Logger logger = 
LoggerFactory.getLogger(CorrelationAttributePartitioner.class);
 
     private final String partitioningAttribute;
@@ -69,7 +73,39 @@ public class CorrelationAttributePartitioner implements 
FlowFilePartitioner {
     }
 
     private int findIndex(final long hash, final int partitions) {
-        final Random random = new Random(hash);
-        return random.nextInt(partitions);
+        // Method implementation based on Google Guava 
com.google.common.hash.Hashing.consistentHash()
+        final LinearCongruentialGenerator generator = new 
LinearCongruentialGenerator(hash);
+        int candidate = 0;
+
+        while (true) {
+            final double nextGenerated = generator.nextDouble();
+            final int nextCandidate = candidate + INDEX_OFFSET;
+            final int next = (int) (nextCandidate / nextGenerated);
+            if (next >= 0 && next < partitions) {
+                candidate = next;
+            } else {
+                final int index;
+                if (candidate == 0) {
+                    index = candidate;
+                } else {
+                    // Adjust index when handling more than one partition
+                    index = candidate - INDEX_OFFSET;
+                }
+                return index;
+            }
+        }
+    }
+
+    private static final class LinearCongruentialGenerator {
+        private long state;
+
+        private LinearCongruentialGenerator(final long seed) {
+            this.state = seed;
+        }
+
+        private double nextDouble() {
+            state = LCG_MULTIPLIER * state + INDEX_OFFSET;
+            return ((double) ((int) (state >>> 33) + 1)) / 0x1.0p31;
+        }
     }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
new file mode 100644
index 0000000000..6ac55bfd36
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class CorrelationAttributePartitionerTest {
+    private static final String PARTITIONING_ATTRIBUTE = "group";
+
+    private static final String FIRST_ATTRIBUTE = "1";
+
+    private static final String SECOND_ATTRIBUTE = "2";
+
+    @Mock
+    private FlowFileRecord flowFileRecord;
+
+    @Mock
+    private QueuePartition localPartition;
+
+    @Mock
+    private QueuePartition firstPartition;
+
+    @Mock
+    private QueuePartition secondPartition;
+
+    @Mock
+    private QueuePartition thirdPartition;
+
+    private CorrelationAttributePartitioner partitioner;
+
+    @BeforeEach
+    void setPartitioner() {
+        partitioner = new 
CorrelationAttributePartitioner(PARTITIONING_ATTRIBUTE);
+    }
+
+    @Test
+    void testRebalanceOnClusterResize() {
+        assertTrue(partitioner.isRebalanceOnClusterResize());
+    }
+
+    @Test
+    void testRebalanceOnFailure() {
+        assertFalse(partitioner.isRebalanceOnFailure());
+    }
+
+    @Test
+    void testGetPartitionOnePartitionNullAttribute() {
+        final QueuePartition[] partitions = new 
QueuePartition[]{firstPartition};
+
+        final QueuePartition partition = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+
+        assertEquals(firstPartition, partition);
+    }
+
+    @Test
+    void testGetPartitionTwoPartitionsNullAttribute() {
+        final QueuePartition[] partitions = new 
QueuePartition[]{firstPartition, secondPartition};
+
+        final QueuePartition partition = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+
+        assertEquals(firstPartition, partition);
+    }
+
+    @Test
+    void testGetPartitionThreePartitionsAttributeDefined() {
+        final QueuePartition[] partitions = new 
QueuePartition[]{firstPartition, secondPartition, thirdPartition};
+
+        // Set First Attribute for partitioning
+        
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(FIRST_ATTRIBUTE);
+
+        final QueuePartition firstSelected = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+        assertEquals(firstPartition, firstSelected);
+
+        // Set Second Attribute for partitioning
+        
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(SECOND_ATTRIBUTE);
+
+        final QueuePartition secondSelected = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+        assertEquals(secondPartition, secondSelected);
+
+        final QueuePartition thirdSelected = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+        assertEquals(secondPartition, thirdSelected);
+
+        // Reset to First Attribute for partitioning
+        
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(FIRST_ATTRIBUTE);
+
+        final QueuePartition fourthSelected = 
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+        assertEquals(firstPartition, fourthSelected);
+    }
+}

Reply via email to