This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 773160958209a8a173b4f741517c4bda63a28e82 Author: Tamas Palfy <[email protected]> AuthorDate: Wed Oct 30 18:48:23 2019 +0100 NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize(). After: Go over all partitions and see if all of them are full. Wrapping RoundRobinPartitioner in a (new) AvailableSeekingPartitioner which selects a new partition if the currently selected one is full. --- .../clustered/SocketLoadBalancedFlowFileQueue.java | 14 +- .../partition/AvailableSeekingPartitioner.java | 62 +++++++ .../TestRoundRobinFlowFileQueueBalancing.java | 203 +++++++++++++++++++++ 3 files changed, 278 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index ae231f7..23a3788 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -37,6 +37,7 @@ import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; import org.apache.nifi.controller.queue.StandardQueueDiagnostics; import org.apache.nifi.controller.queue.SwappablePriorityQueue; import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; +import org.apache.nifi.controller.queue.clustered.partition.AvailableSeekingPartitioner; import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner; import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner; import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner; @@ -205,7 +206,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple partitioner = new CorrelationAttributePartitioner(partitioningAttribute); break; case ROUND_ROBIN: - partitioner = new RoundRobinPartitioner(); + partitioner = new AvailableSeekingPartitioner(new RoundRobinPartitioner(), this::isFull); break; case SINGLE_NODE: partitioner = new FirstNodePartitioner(); @@ -509,6 +510,17 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } @Override + public boolean isFull() { + for (QueuePartition queuePartition : queuePartitions) { + if (!isFull(queuePartition.size())) { + return false; + } + } + + return true; + } + + @Override public boolean isActiveQueueEmpty() { return localPartition.isActiveQueueEmpty(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java new file mode 100644 index 0000000..2ee81b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.partition; + +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.repository.FlowFileRecord; + +import java.util.function.Function; + +public class AvailableSeekingPartitioner implements FlowFilePartitioner { + private final FlowFilePartitioner partitionerDelegate; + private final Function<QueueSize, Boolean> fullCheck; + + public AvailableSeekingPartitioner(FlowFilePartitioner partitionerDelegate, Function<QueueSize, Boolean> fullCheck) { + this.partitionerDelegate = partitionerDelegate; + this.fullCheck = fullCheck; + } + + @Override + public QueuePartition getPartition(FlowFileRecord flowFile, QueuePartition[] partitions, QueuePartition localPartition) { + for (int attemptCounter = 0; attemptCounter < partitions.length; attemptCounter++) { + QueuePartition selectedPartition = partitionerDelegate.getPartition(flowFile, partitions, localPartition); + + if (!fullCheck.apply(selectedPartition.size())) { + return selectedPartition; + } + } + + // As we don't want to return null here, fall back to original logic if all partitions are full. + return partitionerDelegate.getPartition(flowFile, partitions, localPartition); + } + + @Override + public boolean isRebalanceOnClusterResize() { + return partitionerDelegate.isRebalanceOnClusterResize(); + } + + @Override + public boolean isRebalanceOnFailure() { + return partitionerDelegate.isRebalanceOnFailure(); + } + + @Override + public boolean isPartitionStatic() { + return partitionerDelegate.isPartitionStatic(); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java new file mode 100644 index 0000000..7037fa7 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.MockFlowFileRecord; +import org.apache.nifi.controller.MockSwapManager; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; +import org.apache.nifi.controller.queue.NopConnectionEventListener; +import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestRoundRobinFlowFileQueueBalancing { + + private Connection connection; + private FlowFileRepository flowFileRepo; + private ContentRepository contentRepo; + private ProvenanceEventRepository provRepo; + private ResourceClaimManager claimManager; + private ClusterCoordinator clusterCoordinator; + private MockSwapManager swapManager; + private EventReporter eventReporter; + private SocketLoadBalancedFlowFileQueue queue; + + private List<NodeIdentifier> nodeIds; + private int nodePort = 4096; + + private NodeIdentifier localNodeIdentifier; + private NodeIdentifier remoteNodeIdentifier1; + private NodeIdentifier remoteNodeIdentifier2; + + private int backPressureObjectThreshold = 10; + + @Before + public void setup() { + MockFlowFileRecord.resetIdGenerator(); + connection = mock(Connection.class); + when(connection.getIdentifier()).thenReturn("unit-test"); + + flowFileRepo = mock(FlowFileRepository.class); + contentRepo = mock(ContentRepository.class); + provRepo = mock(ProvenanceEventRepository.class); + claimManager = new StandardResourceClaimManager(); + clusterCoordinator = mock(ClusterCoordinator.class); + swapManager = new MockSwapManager(); + eventReporter = EventReporter.NO_OP; + + localNodeIdentifier = createNodeIdentifier("00000000-0000-0000-0000-000000000000"); + remoteNodeIdentifier1 = createNodeIdentifier("11111111-1111-1111-1111-111111111111"); + remoteNodeIdentifier2 = createNodeIdentifier("22222222-2222-2222-2222-222222222222"); + + nodeIds = new ArrayList<>(); + nodeIds.add(localNodeIdentifier); + nodeIds.add(remoteNodeIdentifier1); + nodeIds.add(remoteNodeIdentifier2); + + doAnswer((Answer<Set<NodeIdentifier>>) invocation -> new HashSet<>(nodeIds)).when(clusterCoordinator).getNodeIdentifiers(); + when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier); + + final ProcessScheduler scheduler = mock(ProcessScheduler.class); + + final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class); + queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo, + contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter); + + + queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null); + queue.setBackPressureObjectThreshold(backPressureObjectThreshold); + } + + private NodeIdentifier createNodeIdentifier(final String uuid) { + return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", nodePort++, + "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet()); + } + + @Test + public void testIsFullShouldReturnFalseWhenLocalIsFullRemotesAreNot() { + // GIVEN + boolean expected = false; + int[] expectedPartitionSizes = {10, 0, 0}; + + // WHEN + IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); + + // THEN + testIsFull(expected, expectedPartitionSizes); + } + + @Test + public void testIsFullShouldReturnFalseWhenLocalAndOneRemoteIsFullOtherRemoteIsNot() { + // GIVEN + boolean expected = false; + int[] expectedPartitionSizes = {10, 10, 0}; + + // WHEN + IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); + IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L))); + + // THEN + testIsFull(expected, expectedPartitionSizes); + } + + @Test + public void testIsFullShouldReturnTrueWhenAllPartitionsAreFull() { + // GIVEN + boolean expected = true; + int[] expectedPartitionSizes = {10, 10, 10}; + + // WHEN + IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); + IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L))); + IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(2).put(new MockFlowFileRecord(0L))); + + // THEN + testIsFull(expected, expectedPartitionSizes); + } + + @Test + public void testBalancingWhenAllPartitionsAreEmpty() { + // GIVEN + int[] expectedPartitionSizes = {3, 3, 3}; + + // WHEN + IntStream.rangeClosed(1, 9).forEach(__ -> queue.put(new MockFlowFileRecord(0L))); + + // THEN + assertPartitionSizes(expectedPartitionSizes); + } + + @Test + public void testBalancingWhenLocalPartitionIsFull() { + // GIVEN + int[] expectedPartitionSizes = {10, 2, 2}; + + IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); + + // WHEN + IntStream.rangeClosed(1, 4).forEach(__ -> queue.put(new MockFlowFileRecord(0L))); + + // THEN + assertPartitionSizes(expectedPartitionSizes); + } + + private void testIsFull(boolean expected, int[] expectedPartitionSizes) { + // GIVEN + + // WHEN + boolean actual = queue.isFull(); + + // THEN + assertEquals(expected, actual); + assertPartitionSizes(expectedPartitionSizes); + } + + private void assertPartitionSizes(final int[] expectedSizes) { + final int[] partitionSizes = new int[queue.getPartitionCount()]; + + for (int i = 0; i < partitionSizes.length; i++) { + partitionSizes[i] = queue.getPartition(i).size().getObjectCount(); + } + + assertArrayEquals(expectedSizes, partitionSizes); + } +}
