This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 773160958209a8a173b4f741517c4bda63a28e82
Author: Tamas Palfy <[email protected]>
AuthorDate: Wed Oct 30 18:48:23 2019 +0100

    NIFI-6787 - Before: When checking if a load balanced connection queue is 
full, we compare the totalSize.get() and getMaxQueueSize().
    After: Go over all partitions and see if all of them are full.
    Wrapping RoundRobinPartitioner in a (new) AvailableSeekingPartitioner which 
selects a new partition if the currently selected one is full.
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  14 +-
 .../partition/AvailableSeekingPartitioner.java     |  62 +++++++
 .../TestRoundRobinFlowFileQueueBalancing.java      | 203 +++++++++++++++++++++
 3 files changed, 278 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index ae231f7..23a3788 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -37,6 +37,7 @@ import 
org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
 import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import 
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
+import 
org.apache.nifi.controller.queue.clustered.partition.AvailableSeekingPartitioner;
 import 
org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner;
 import 
org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner;
 import 
org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
@@ -205,7 +206,7 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 partitioner = new 
CorrelationAttributePartitioner(partitioningAttribute);
                 break;
             case ROUND_ROBIN:
-                partitioner = new RoundRobinPartitioner();
+                partitioner = new AvailableSeekingPartitioner(new 
RoundRobinPartitioner(), this::isFull);
                 break;
             case SINGLE_NODE:
                 partitioner = new FirstNodePartitioner();
@@ -509,6 +510,17 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
     }
 
     @Override
+    public boolean isFull() {
+        for (QueuePartition queuePartition : queuePartitions) {
+            if (!isFull(queuePartition.size())) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
     public boolean isActiveQueueEmpty() {
         return localPartition.isActiveQueueEmpty();
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
new file mode 100644
index 0000000..2ee81b7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.function.Function;
+
+public class AvailableSeekingPartitioner implements FlowFilePartitioner {
+    private final FlowFilePartitioner partitionerDelegate;
+    private final Function<QueueSize, Boolean> fullCheck;
+
+    public AvailableSeekingPartitioner(FlowFilePartitioner 
partitionerDelegate, Function<QueueSize, Boolean> fullCheck) {
+        this.partitionerDelegate = partitionerDelegate;
+        this.fullCheck = fullCheck;
+    }
+
+    @Override
+    public QueuePartition getPartition(FlowFileRecord flowFile, 
QueuePartition[] partitions, QueuePartition localPartition) {
+        for (int attemptCounter = 0; attemptCounter < partitions.length; 
attemptCounter++) {
+            QueuePartition selectedPartition = 
partitionerDelegate.getPartition(flowFile, partitions, localPartition);
+
+            if (!fullCheck.apply(selectedPartition.size())) {
+                return selectedPartition;
+            }
+        }
+
+        // As we don't want to return null here, fall back to original logic 
if all partitions are full.
+        return partitionerDelegate.getPartition(flowFile, partitions, 
localPartition);
+    }
+
+    @Override
+    public boolean isRebalanceOnClusterResize() {
+        return partitionerDelegate.isRebalanceOnClusterResize();
+    }
+
+    @Override
+    public boolean isRebalanceOnFailure() {
+        return partitionerDelegate.isRebalanceOnFailure();
+    }
+
+    @Override
+    public boolean isPartitionStatic() {
+        return partitionerDelegate.isPartitionStatic();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
new file mode 100644
index 0000000..7037fa7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.MockFlowFileRecord;
+import org.apache.nifi.controller.MockSwapManager;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import 
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestRoundRobinFlowFileQueueBalancing {
+
+    private Connection connection;
+    private FlowFileRepository flowFileRepo;
+    private ContentRepository contentRepo;
+    private ProvenanceEventRepository provRepo;
+    private ResourceClaimManager claimManager;
+    private ClusterCoordinator clusterCoordinator;
+    private MockSwapManager swapManager;
+    private EventReporter eventReporter;
+    private SocketLoadBalancedFlowFileQueue queue;
+
+    private List<NodeIdentifier> nodeIds;
+    private int nodePort = 4096;
+
+    private NodeIdentifier localNodeIdentifier;
+    private NodeIdentifier remoteNodeIdentifier1;
+    private NodeIdentifier remoteNodeIdentifier2;
+
+    private int backPressureObjectThreshold = 10;
+
+    @Before
+    public void setup() {
+        MockFlowFileRecord.resetIdGenerator();
+        connection = mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("unit-test");
+
+        flowFileRepo = mock(FlowFileRepository.class);
+        contentRepo = mock(ContentRepository.class);
+        provRepo = mock(ProvenanceEventRepository.class);
+        claimManager = new StandardResourceClaimManager();
+        clusterCoordinator = mock(ClusterCoordinator.class);
+        swapManager = new MockSwapManager();
+        eventReporter = EventReporter.NO_OP;
+
+        localNodeIdentifier = 
createNodeIdentifier("00000000-0000-0000-0000-000000000000");
+        remoteNodeIdentifier1 = 
createNodeIdentifier("11111111-1111-1111-1111-111111111111");
+        remoteNodeIdentifier2 = 
createNodeIdentifier("22222222-2222-2222-2222-222222222222");
+
+        nodeIds = new ArrayList<>();
+        nodeIds.add(localNodeIdentifier);
+        nodeIds.add(remoteNodeIdentifier1);
+        nodeIds.add(remoteNodeIdentifier2);
+
+        doAnswer((Answer<Set<NodeIdentifier>>) invocation -> new 
HashSet<>(nodeIds)).when(clusterCoordinator).getNodeIdentifiers();
+        
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier);
+
+        final ProcessScheduler scheduler = mock(ProcessScheduler.class);
+
+        final AsyncLoadBalanceClientRegistry registry = 
mock(AsyncLoadBalanceClientRegistry.class);
+        queue = new SocketLoadBalancedFlowFileQueue("unit-test", new 
NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
+                contentRepo, claimManager, clusterCoordinator, registry, 
swapManager, 10000, eventReporter);
+
+
+        queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
+        queue.setBackPressureObjectThreshold(backPressureObjectThreshold);
+    }
+
+    private NodeIdentifier createNodeIdentifier(final String uuid) {
+        return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", 
nodePort++,
+                "localhost", nodePort++, "localhost", nodePort++, nodePort++, 
true, Collections.emptySet());
+    }
+
+    @Test
+    public void testIsFullShouldReturnFalseWhenLocalIsFullRemotesAreNot() {
+        // GIVEN
+        boolean expected = false;
+        int[] expectedPartitionSizes = {10, 0, 0};
+
+        // WHEN
+        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> 
queue.getPartition(0).put(new MockFlowFileRecord(0L)));
+
+        // THEN
+        testIsFull(expected, expectedPartitionSizes);
+    }
+
+    @Test
+    public void 
testIsFullShouldReturnFalseWhenLocalAndOneRemoteIsFullOtherRemoteIsNot() {
+        // GIVEN
+        boolean expected = false;
+        int[] expectedPartitionSizes = {10, 10, 0};
+
+        // WHEN
+        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> 
queue.getPartition(0).put(new MockFlowFileRecord(0L)));
+        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> 
queue.getPartition(1).put(new MockFlowFileRecord(0L)));
+
+        // THEN
+        testIsFull(expected, expectedPartitionSizes);
+    }
+
+    @Test
+    public void testIsFullShouldReturnTrueWhenAllPartitionsAreFull() {
+        // GIVEN
+        boolean expected = true;
+        int[] expectedPartitionSizes = {10, 10, 10};
+
+        // WHEN
+        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> 
queue.getPartition(0).put(new MockFlowFileRecord(0L)));
+        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> 
queue.getPartition(1).put(new MockFlowFileRecord(0L)));
+        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> 
queue.getPartition(2).put(new MockFlowFileRecord(0L)));
+
+        // THEN
+        testIsFull(expected, expectedPartitionSizes);
+    }
+
+    @Test
+    public void testBalancingWhenAllPartitionsAreEmpty() {
+        // GIVEN
+        int[] expectedPartitionSizes = {3, 3, 3};
+
+        // WHEN
+        IntStream.rangeClosed(1, 9).forEach(__ -> queue.put(new 
MockFlowFileRecord(0L)));
+
+        // THEN
+        assertPartitionSizes(expectedPartitionSizes);
+    }
+
+    @Test
+    public void testBalancingWhenLocalPartitionIsFull() {
+        // GIVEN
+        int[] expectedPartitionSizes = {10, 2, 2};
+
+        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> 
queue.getPartition(0).put(new MockFlowFileRecord(0L)));
+
+        // WHEN
+        IntStream.rangeClosed(1, 4).forEach(__ -> queue.put(new 
MockFlowFileRecord(0L)));
+
+        // THEN
+        assertPartitionSizes(expectedPartitionSizes);
+    }
+
+    private void testIsFull(boolean expected, int[] expectedPartitionSizes) {
+        // GIVEN
+
+        // WHEN
+        boolean actual = queue.isFull();
+
+        // THEN
+        assertEquals(expected, actual);
+        assertPartitionSizes(expectedPartitionSizes);
+    }
+
+    private void assertPartitionSizes(final int[] expectedSizes) {
+        final int[] partitionSizes = new int[queue.getPartitionCount()];
+
+        for (int i = 0; i < partitionSizes.length; i++) {
+            partitionSizes[i] = queue.getPartition(i).size().getObjectCount();
+        }
+
+        assertArrayEquals(expectedSizes, partitionSizes);
+    }
+}

Reply via email to