This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9b84dd475e NIFI-14236 Implement consistent hash for Attribute
Partitioner (#9709)
9b84dd475e is described below
commit 9b84dd475ee6561b2fdf2b0c95c2c182ed3f0fb9
Author: David Handermann <[email protected]>
AuthorDate: Mon Feb 10 12:19:57 2025 -0600
NIFI-14236 Implement consistent hash for Attribute Partitioner (#9709)
* NIFI-14236 Implemented consistent hash for Attribute Partitioner
- Used implementation from com.google.common.hash.Hashing.consistentHash
method
---
nifi-assembly/NOTICE | 7 ++
.../src/main/resources/META-INF/NOTICE | 7 ++
.../partition/CorrelationAttributePartitioner.java | 42 +++++++-
.../CorrelationAttributePartitionerTest.java | 116 +++++++++++++++++++++
4 files changed, 169 insertions(+), 3 deletions(-)
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index c7732beba6..8f85ffe722 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -69,6 +69,13 @@ This includes derived works from Dropwizard Metrics
available under Apache Softw
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java
+This product includes derived works from Google Guava (ASLv2 licensed)
+ Copyright 2011 The Guava Authors
+ The derived work is adapted from the class com.google.common.hash.Hashing:
+
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Hashing.java
+ The derived work can be found in:
+
org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner
+
===========================================
Apache Software License v2
===========================================
diff --git
a/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
b/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
index 01f6bc95e4..c1c3d8b951 100644
---
a/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
+++
b/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
@@ -4,6 +4,13 @@ Copyright 2014-2024 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+This product includes derived works from Google Guava (ASLv2 licensed)
+ Copyright 2011 The Guava Authors
+ The derived work is adapted from the class com.google.common.hash.Hashing:
+
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Hashing.java
+ The derived work can be found in:
+
org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner
+
******************
Apache Software License v2
******************
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
index 54f01e61d5..6335446fbc 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitioner.java
@@ -24,9 +24,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
public class CorrelationAttributePartitioner implements FlowFilePartitioner {
+ private static final int INDEX_OFFSET = 1;
+
+ // Multiplier from
com.google.common.hash.Hashing.LinearCongruentialGenerator
+ private static final long LCG_MULTIPLIER = 2862933555777941757L;
+
private static final Logger logger =
LoggerFactory.getLogger(CorrelationAttributePartitioner.class);
private final String partitioningAttribute;
@@ -69,7 +73,39 @@ public class CorrelationAttributePartitioner implements
FlowFilePartitioner {
}
private int findIndex(final long hash, final int partitions) {
- final Random random = new Random(hash);
- return random.nextInt(partitions);
+ // Method implementation based on Google Guava
com.google.common.hash.Hashing.consistentHash()
+ final LinearCongruentialGenerator generator = new
LinearCongruentialGenerator(hash);
+ int candidate = 0;
+
+ while (true) {
+ final double nextGenerated = generator.nextDouble();
+ final int nextCandidate = candidate + INDEX_OFFSET;
+ final int next = (int) (nextCandidate / nextGenerated);
+ if (next >= 0 && next < partitions) {
+ candidate = next;
+ } else {
+ final int index;
+ if (candidate == 0) {
+ index = candidate;
+ } else {
+ // Adjust index when handling more than one partition
+ index = candidate - INDEX_OFFSET;
+ }
+ return index;
+ }
+ }
+ }
+
+ private static final class LinearCongruentialGenerator {
+ private long state;
+
+ private LinearCongruentialGenerator(final long seed) {
+ this.state = seed;
+ }
+
+ private double nextDouble() {
+ state = LCG_MULTIPLIER * state + INDEX_OFFSET;
+ return ((double) ((int) (state >>> 33) + 1)) / 0x1.0p31;
+ }
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
new file mode 100644
index 0000000000..6ac55bfd36
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/partition/CorrelationAttributePartitionerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class CorrelationAttributePartitionerTest {
+ private static final String PARTITIONING_ATTRIBUTE = "group";
+
+ private static final String FIRST_ATTRIBUTE = "1";
+
+ private static final String SECOND_ATTRIBUTE = "2";
+
+ @Mock
+ private FlowFileRecord flowFileRecord;
+
+ @Mock
+ private QueuePartition localPartition;
+
+ @Mock
+ private QueuePartition firstPartition;
+
+ @Mock
+ private QueuePartition secondPartition;
+
+ @Mock
+ private QueuePartition thirdPartition;
+
+ private CorrelationAttributePartitioner partitioner;
+
+ @BeforeEach
+ void setPartitioner() {
+ partitioner = new
CorrelationAttributePartitioner(PARTITIONING_ATTRIBUTE);
+ }
+
+ @Test
+ void testRebalanceOnClusterResize() {
+ assertTrue(partitioner.isRebalanceOnClusterResize());
+ }
+
+ @Test
+ void testRebalanceOnFailure() {
+ assertFalse(partitioner.isRebalanceOnFailure());
+ }
+
+ @Test
+ void testGetPartitionOnePartitionNullAttribute() {
+ final QueuePartition[] partitions = new
QueuePartition[]{firstPartition};
+
+ final QueuePartition partition =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+
+ assertEquals(firstPartition, partition);
+ }
+
+ @Test
+ void testGetPartitionTwoPartitionsNullAttribute() {
+ final QueuePartition[] partitions = new
QueuePartition[]{firstPartition, secondPartition};
+
+ final QueuePartition partition =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+
+ assertEquals(firstPartition, partition);
+ }
+
+ @Test
+ void testGetPartitionThreePartitionsAttributeDefined() {
+ final QueuePartition[] partitions = new
QueuePartition[]{firstPartition, secondPartition, thirdPartition};
+
+ // Set First Attribute for partitioning
+
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(FIRST_ATTRIBUTE);
+
+ final QueuePartition firstSelected =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+ assertEquals(firstPartition, firstSelected);
+
+ // Set Second Attribute for partitioning
+
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(SECOND_ATTRIBUTE);
+
+ final QueuePartition secondSelected =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+ assertEquals(secondPartition, secondSelected);
+
+ final QueuePartition thirdSelected =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+ assertEquals(secondPartition, thirdSelected);
+
+ // Reset to First Attribute for partitioning
+
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(FIRST_ATTRIBUTE);
+
+ final QueuePartition fourthSelected =
partitioner.getPartition(flowFileRecord, partitions, localPartition);
+ assertEquals(firstPartition, fourthSelected);
+ }
+}