This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.11.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit a78cecd95f1a690614c17f38aa003f3d78e468cb Author: Joe Witt <[email protected]> AuthorDate: Wed Jan 29 15:26:17 2020 -0500 Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()." This reverts commit 773160958209a8a173b4f741517c4bda63a28e82. --- .../clustered/SocketLoadBalancedFlowFileQueue.java | 14 +- .../partition/AvailableSeekingPartitioner.java | 62 ------- .../TestRoundRobinFlowFileQueueBalancing.java | 203 --------------------- 3 files changed, 1 insertion(+), 278 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 837a3f6..e69daad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -37,7 +37,6 @@ import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; import org.apache.nifi.controller.queue.StandardQueueDiagnostics; import org.apache.nifi.controller.queue.SwappablePriorityQueue; import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; -import org.apache.nifi.controller.queue.clustered.partition.AvailableSeekingPartitioner; import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner; import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner; import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner; @@ -212,7 +211,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple partitioner = new CorrelationAttributePartitioner(partitioningAttribute); break; case ROUND_ROBIN: - partitioner = new AvailableSeekingPartitioner(new RoundRobinPartitioner(), this::isFull); + partitioner = new RoundRobinPartitioner(); break; case SINGLE_NODE: partitioner = new FirstNodePartitioner(); @@ -516,17 +515,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } @Override - public boolean isFull() { - for (QueuePartition queuePartition : queuePartitions) { - if (!isFull(queuePartition.size())) { - return false; - } - } - - return true; - } - - @Override public boolean isActiveQueueEmpty() { return localPartition.isActiveQueueEmpty(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java deleted file mode 100644 index 2ee81b7..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.queue.clustered.partition; - -import org.apache.nifi.controller.queue.QueueSize; -import org.apache.nifi.controller.repository.FlowFileRecord; - -import java.util.function.Function; - -public class AvailableSeekingPartitioner implements FlowFilePartitioner { - private final FlowFilePartitioner partitionerDelegate; - private final Function<QueueSize, Boolean> fullCheck; - - public AvailableSeekingPartitioner(FlowFilePartitioner partitionerDelegate, Function<QueueSize, Boolean> fullCheck) { - this.partitionerDelegate = partitionerDelegate; - this.fullCheck = fullCheck; - } - - @Override - public QueuePartition getPartition(FlowFileRecord flowFile, QueuePartition[] partitions, QueuePartition localPartition) { - for (int attemptCounter = 0; attemptCounter < partitions.length; attemptCounter++) { - QueuePartition selectedPartition = partitionerDelegate.getPartition(flowFile, partitions, localPartition); - - if (!fullCheck.apply(selectedPartition.size())) { - return selectedPartition; - } - } - - // As we don't want to return null here, fall back to original logic if all partitions are full. - return partitionerDelegate.getPartition(flowFile, partitions, localPartition); - } - - @Override - public boolean isRebalanceOnClusterResize() { - return partitionerDelegate.isRebalanceOnClusterResize(); - } - - @Override - public boolean isRebalanceOnFailure() { - return partitionerDelegate.isRebalanceOnFailure(); - } - - @Override - public boolean isPartitionStatic() { - return partitionerDelegate.isPartitionStatic(); - } -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java deleted file mode 100644 index 7037fa7..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.queue.clustered; - -import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.controller.MockFlowFileRecord; -import org.apache.nifi.controller.MockSwapManager; -import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.queue.LoadBalanceStrategy; -import org.apache.nifi.controller.queue.NopConnectionEventListener; -import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; -import org.apache.nifi.controller.repository.ContentRepository; -import org.apache.nifi.controller.repository.FlowFileRepository; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.provenance.ProvenanceEventRepository; -import org.junit.Before; -import org.junit.Test; -import org.mockito.stubbing.Answer; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.IntStream; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestRoundRobinFlowFileQueueBalancing { - - private Connection connection; - private FlowFileRepository flowFileRepo; - private ContentRepository contentRepo; - private ProvenanceEventRepository provRepo; - private ResourceClaimManager claimManager; - private ClusterCoordinator clusterCoordinator; - private MockSwapManager swapManager; - private EventReporter eventReporter; - private SocketLoadBalancedFlowFileQueue queue; - - private List<NodeIdentifier> nodeIds; - private int nodePort = 4096; - - private NodeIdentifier localNodeIdentifier; - private NodeIdentifier remoteNodeIdentifier1; - private NodeIdentifier remoteNodeIdentifier2; - - private int backPressureObjectThreshold = 10; - - @Before - public void setup() { - MockFlowFileRecord.resetIdGenerator(); - connection = mock(Connection.class); - when(connection.getIdentifier()).thenReturn("unit-test"); - - flowFileRepo = mock(FlowFileRepository.class); - contentRepo = mock(ContentRepository.class); - provRepo = mock(ProvenanceEventRepository.class); - claimManager = new StandardResourceClaimManager(); - clusterCoordinator = mock(ClusterCoordinator.class); - swapManager = new MockSwapManager(); - eventReporter = EventReporter.NO_OP; - - localNodeIdentifier = createNodeIdentifier("00000000-0000-0000-0000-000000000000"); - remoteNodeIdentifier1 = createNodeIdentifier("11111111-1111-1111-1111-111111111111"); - remoteNodeIdentifier2 = createNodeIdentifier("22222222-2222-2222-2222-222222222222"); - - nodeIds = new ArrayList<>(); - nodeIds.add(localNodeIdentifier); - nodeIds.add(remoteNodeIdentifier1); - nodeIds.add(remoteNodeIdentifier2); - - doAnswer((Answer<Set<NodeIdentifier>>) invocation -> new HashSet<>(nodeIds)).when(clusterCoordinator).getNodeIdentifiers(); - when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier); - - final ProcessScheduler scheduler = mock(ProcessScheduler.class); - - final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class); - queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo, - contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter); - - - queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null); - queue.setBackPressureObjectThreshold(backPressureObjectThreshold); - } - - private NodeIdentifier createNodeIdentifier(final String uuid) { - return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", nodePort++, - "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet()); - } - - @Test - public void testIsFullShouldReturnFalseWhenLocalIsFullRemotesAreNot() { - // GIVEN - boolean expected = false; - int[] expectedPartitionSizes = {10, 0, 0}; - - // WHEN - IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); - - // THEN - testIsFull(expected, expectedPartitionSizes); - } - - @Test - public void testIsFullShouldReturnFalseWhenLocalAndOneRemoteIsFullOtherRemoteIsNot() { - // GIVEN - boolean expected = false; - int[] expectedPartitionSizes = {10, 10, 0}; - - // WHEN - IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); - IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L))); - - // THEN - testIsFull(expected, expectedPartitionSizes); - } - - @Test - public void testIsFullShouldReturnTrueWhenAllPartitionsAreFull() { - // GIVEN - boolean expected = true; - int[] expectedPartitionSizes = {10, 10, 10}; - - // WHEN - IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); - IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L))); - IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(2).put(new MockFlowFileRecord(0L))); - - // THEN - testIsFull(expected, expectedPartitionSizes); - } - - @Test - public void testBalancingWhenAllPartitionsAreEmpty() { - // GIVEN - int[] expectedPartitionSizes = {3, 3, 3}; - - // WHEN - IntStream.rangeClosed(1, 9).forEach(__ -> queue.put(new MockFlowFileRecord(0L))); - - // THEN - assertPartitionSizes(expectedPartitionSizes); - } - - @Test - public void testBalancingWhenLocalPartitionIsFull() { - // GIVEN - int[] expectedPartitionSizes = {10, 2, 2}; - - IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L))); - - // WHEN - IntStream.rangeClosed(1, 4).forEach(__ -> queue.put(new MockFlowFileRecord(0L))); - - // THEN - assertPartitionSizes(expectedPartitionSizes); - } - - private void testIsFull(boolean expected, int[] expectedPartitionSizes) { - // GIVEN - - // WHEN - boolean actual = queue.isFull(); - - // THEN - assertEquals(expected, actual); - assertPartitionSizes(expectedPartitionSizes); - } - - private void assertPartitionSizes(final int[] expectedSizes) { - final int[] partitionSizes = new int[queue.getPartitionCount()]; - - for (int i = 0; i < partitionSizes.length; i++) { - partitionSizes[i] = queue.getPartition(i).size().getObjectCount(); - } - - assertArrayEquals(expectedSizes, partitionSizes); - } -}
