This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d90ef06  NIFI-9017: Update Load Balanced Connection logic so that if a 
node connects to the cluster with a different load balancing hostname/port, it 
starts sending to the new endpoint instead of failing to send to the old 
endpoint (#5287)
d90ef06 is described below

commit d90ef0675290f8d7cdb07bf33be3a1e01576145a
Author: markap14 <[email protected]>
AuthorDate: Fri Aug 27 21:37:06 2021 -0400

    NIFI-9017: Update Load Balanced Connection logic so that if a node connects 
to the cluster with a different load balancing hostname/port, it starts sending 
to the new endpoint instead of failing to send to the old endpoint (#5287)
    
    Self-merging based on +1 feedback from multiple active community members 
who have reviewed & tested code
---
 .../src/main/asciidoc/administration-guide.adoc    |  4 +-
 .../node/TestNodeClusterCoordinator.java           | 44 +++++++++++++
 .../clustered/SocketLoadBalancedFlowFileQueue.java | 42 +++++++++---
 .../tests/system/loadbalance/LoadBalanceIT.java    | 77 ++++++++++++++++++++++
 4 files changed, 158 insertions(+), 9 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 1bb54a5..46f25ae 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -3882,7 +3882,9 @@ by the `nifi.cluster.flow.election.max.candidates` 
property, the cluster will no
 |`nifi.cluster.flow.election.max.candidates`|Specifies the number of Nodes 
required in the cluster to cause early election of Flows. This allows the Nodes 
in the cluster to avoid having to wait a
 long time before starting processing if we reach at least this number of nodes 
in the cluster.
 |`nifi.cluster.load.balance.port`|Specifies the port to listen on for incoming 
connections for load balancing data across the cluster. The default value is 
`6342`.
-|`nifi.cluster.load.balance.host`|Specifies the hostname to listen on for 
incoming connections for load balancing data across the cluster. If not 
specified, will default to the value used by the `nifi.cluster.node.address` 
property.
+|`nifi.cluster.load.balance.host`|Specifies the hostname to listen on for 
incoming connections for load balancing data across the cluster. If not 
specified, will default to the value used by the
+`nifi.cluster.node.address` property. The value set here does not have to be a 
hostname/IP address that is addressable outside of the cluster. However, all 
nodes within the cluster must be able to
+connect to the node using this hostname/IP address.
 |`nifi.cluster.load.balance.connections.per.node`|The maximum number of 
connections to create between this node and each other node in the cluster. For 
example, if there are 5 nodes in the cluster and this value is set to 4, there 
will be up to 20 socket connections established for load-balancing purposes (5 
x 4 = 20). The default value is `1`.
 |`nifi.cluster.load.balance.max.thread.count`|The maximum number of threads to 
use for transferring data from this node to other nodes in the cluster. While a 
given thread can only write to a single socket at a time, a single thread is 
capable of servicing multiple connections simultaneously because a given 
connection may not be available for reading/writing at any given time. The 
default value is `8`—i.e., up to 8 threads will be responsible for transferring 
data to other nodes, regardl [...]
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 0d7b183..511620c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -453,6 +453,50 @@ public class TestNodeClusterCoordinator {
         assertEquals(conflictingId.getSocketPort(), 
conflictingNodeId.getSocketPort());
     }
 
+    @Test
+    public void 
testAddNodeIdentifierWithSameAddressDifferentLoadBalanceEndpoint() {
+        // Add Node 1 to the cluster
+        final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 
8000, "localhost", 9000, "localhost", 10000, 11000, false);
+
+        final ConnectionRequest connectionRequest = new ConnectionRequest(id1, 
new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
+        final ConnectionRequestMessage crm = new ConnectionRequestMessage();
+        crm.setConnectionRequest(connectionRequest);
+
+        final ProtocolMessage response = coordinator.handle(crm, 
Collections.emptySet());
+        assertNotNull(response);
+        assertTrue(response instanceof ConnectionResponseMessage);
+        final ConnectionResponseMessage responseMessage = 
(ConnectionResponseMessage) response;
+        final NodeIdentifier resolvedNodeId = 
responseMessage.getConnectionResponse().getNodeIdentifier();
+        assertEquals(id1, resolvedNodeId);
+
+        // Add in a conflicting ID
+        final NodeIdentifier conflictingId = new NodeIdentifier("1234", 
"localhost", 8001, "localhost", 9000, "loadbalance-2", 4848, "localhost", 
10000, 11000, false, null);
+        final ConnectionRequest conRequest2 = new 
ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], 
new byte[0], new HashSet<>()));
+        final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
+        crm2.setConnectionRequest(conRequest2);
+
+        final ProtocolMessage conflictingResponse = coordinator.handle(crm2, 
Collections.emptySet());
+        assertNotNull(conflictingResponse);
+        assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
+        final ConnectionResponseMessage conflictingResponseMessage = 
(ConnectionResponseMessage) conflictingResponse;
+        final NodeIdentifier conflictingNodeId = 
conflictingResponseMessage.getConnectionResponse().getNodeIdentifier();
+        assertEquals(id1.getId(), conflictingNodeId.getId());
+        assertEquals(conflictingId.getApiAddress(), 
conflictingNodeId.getApiAddress());
+        assertEquals(conflictingId.getApiPort(), 
conflictingNodeId.getApiPort());
+        assertEquals(conflictingId.getSiteToSiteAddress(), 
conflictingNodeId.getSiteToSiteAddress());
+        assertEquals(conflictingId.getSiteToSitePort(), 
conflictingNodeId.getSiteToSitePort());
+        assertEquals(conflictingId.getSocketAddress(), 
conflictingNodeId.getSocketAddress());
+        assertEquals(conflictingId.getSocketPort(), 
conflictingNodeId.getSocketPort());
+
+        // Ensure that the values were updated
+        final Set<NodeIdentifier> registeredNodeIds = 
coordinator.getNodeIdentifiers();
+        assertEquals(1, registeredNodeIds.size());
+
+        final NodeIdentifier registeredId = 
registeredNodeIds.iterator().next();
+        assertEquals("loadbalance-2", registeredId.getLoadBalanceAddress());
+        assertEquals(4848, registeredId.getLoadBalancePort());
+    }
+
     private NodeIdentifier createNodeId(final int index) {
         return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + 
index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, 
false);
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 0c6c1d8..1a1e187 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -85,6 +85,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -95,6 +96,9 @@ import java.util.stream.Collectors;
 public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue 
implements LoadBalancedFlowFileQueue {
     private static final Logger logger = 
LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class);
     private static final int NODE_SWAP_THRESHOLD = 1000;
+    private static final Comparator<NodeIdentifier> 
loadBalanceEndpointComparator =
+        Comparator.comparing(NodeIdentifier::getLoadBalanceAddress)
+            .thenComparing(NodeIdentifier::getLoadBalancePort);
 
     private final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
     private final ConnectionEventListener eventListener;
@@ -139,7 +143,11 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         rebalancingPartition = new StandardRebalancingPartition(swapManager, 
swapThreshold, eventReporter, this, this::drop);
 
         // Create a RemoteQueuePartition for each node
-        nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() 
: clusterCoordinator.getNodeIdentifiers();
+        nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() 
: new TreeSet<>(loadBalanceEndpointComparator);
+
+        if (clusterCoordinator != null) {
+            nodeIdentifiers.addAll(clusterCoordinator.getNodeIdentifiers());
+        }
 
         final List<NodeIdentifier> sortedNodeIdentifiers = new 
ArrayList<>(nodeIdentifiers);
         
sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
@@ -656,16 +664,19 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             }
 
             // Determine which Node Identifiers, if any, were removed.
-            final Set<NodeIdentifier> removedNodeIds = new 
HashSet<>(this.nodeIdentifiers);
+            final Set<NodeIdentifier> removedNodeIds = new 
TreeSet<>(loadBalanceEndpointComparator);
+            removedNodeIds.addAll(this.nodeIdentifiers);
             removedNodeIds.removeAll(updatedNodeIdentifiers);
             logger.debug("{} The following Node Identifiers were removed from 
the cluster: {}", this, removedNodeIds);
 
+            final Function<NodeIdentifier, String> mapKeyTransform = nodeId -> 
nodeId.getLoadBalanceAddress() + ":" + nodeId.getLoadBalancePort();
+
             // Build up a Map of Node ID to Queue Partition so that we can 
easily pull over the existing
             // QueuePartition objects instead of having to create new ones.
-            final Map<NodeIdentifier, QueuePartition> partitionMap = new 
HashMap<>();
+            final Map<String, QueuePartition> partitionMap = new HashMap<>();
             for (final QueuePartition partition : this.queuePartitions) {
                 final Optional<NodeIdentifier> nodeIdOption = 
partition.getNodeIdentifier();
-                nodeIdOption.ifPresent(nodeIdentifier -> 
partitionMap.put(nodeIdentifier, partition));
+                nodeIdOption.ifPresent(nodeIdentifier -> 
partitionMap.put(mapKeyTransform.apply(nodeIdentifier), partition));
             }
 
             // Re-define 'queuePartitions' array
@@ -683,13 +694,15 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             boolean localPartitionIncluded = false;
             for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
                 final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
+                final String nodeIdMapKey = mapKeyTransform.apply(nodeId);
+
                 if 
(nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
                     updatedQueuePartitions[i] = localPartition;
                     localPartitionIncluded = true;
 
                     // If we have RemoteQueuePartition with this Node ID with 
data, that data must be migrated to the local partition.
                     // This can happen if we didn't previously know our Node 
UUID.
-                    final QueuePartition existingPartition = 
partitionMap.get(nodeId);
+                    final QueuePartition existingPartition = 
partitionMap.get(nodeIdMapKey);
                     if (existingPartition != null && existingPartition != 
localPartition) {
                         final FlowFileQueueContents partitionContents = 
existingPartition.packageForRebalance(localPartition.getSwapPartitionName());
                         logger.debug("Transferred data from {} to {}", 
existingPartition, localPartition);
@@ -699,7 +712,7 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                     continue;
                 }
 
-                final QueuePartition existingPartition = 
partitionMap.get(nodeId);
+                final QueuePartition existingPartition = 
partitionMap.get(nodeIdMapKey);
                 updatedQueuePartitions[i] = existingPartition == null ? 
createRemotePartition(nodeId) : existingPartition;
             }
 
@@ -721,7 +734,9 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 // Not all partitions need to be rebalanced, so just ensure 
that we rebalance any FlowFiles that are destined
                 // for a node that is no longer in the cluster.
                 for (final NodeIdentifier removedNodeId : removedNodeIds) {
-                    final QueuePartition removedPartition = 
partitionMap.get(removedNodeId);
+                    final String removedNodeMapKey = 
mapKeyTransform.apply(removedNodeId);
+
+                    final QueuePartition removedPartition = 
partitionMap.get(removedNodeMapKey);
                     if (removedPartition == null) {
                         continue;
                     }
@@ -733,7 +748,9 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
 
             // Unregister any client for which the node was removed from the 
cluster
             for (final NodeIdentifier removedNodeId : removedNodeIds) {
-                final QueuePartition removedPartition = 
partitionMap.get(removedNodeId);
+                final String removedNodeMapKey = 
mapKeyTransform.apply(removedNodeId);
+
+                final QueuePartition removedPartition = 
partitionMap.get(removedNodeMapKey);
                 if (removedPartition instanceof RemoteQueuePartition) {
                     ((RemoteQueuePartition) removedPartition).onRemoved();
                 }
@@ -1089,7 +1106,16 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         public void onNodeAdded(final NodeIdentifier nodeId) {
             partitionWriteLock.lock();
             try {
+                if (nodeIdentifiers.contains(nodeId)) {
+                    logger.debug("Node Identifier {} added to cluster but 
already known in set: {}", nodeId, nodeIdentifiers);
+                    return;
+                }
+
                 final Set<NodeIdentifier> updatedNodeIds = new 
HashSet<>(nodeIdentifiers);
+
+                // If there is any Node Identifier already that has the same 
identifier as the new one, remove it. This allows us to ensure that we
+                // have the correct Node Identifier in terms of Load Balancing 
host/port, even if the newly connected node changed its load balancing host/port
+                updatedNodeIds.removeIf(id -> 
id.getId().equals(nodeId.getId()));
                 updatedNodeIds.add(nodeId);
 
                 logger.debug("Node Identifier {} added to cluster. Node ID's 
changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index a0e93aa..04716b9 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -19,6 +19,7 @@ package org.apache.nifi.tests.system.loadbalance;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.tests.system.NiFiInstance;
 import org.apache.nifi.tests.system.NiFiInstanceFactory;
 import org.apache.nifi.tests.system.NiFiSystemIT;
 import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
@@ -335,4 +336,80 @@ public class LoadBalanceIT extends NiFiSystemIT {
         return stats.getMin() == stats.getMax();
     }
 
+
+    @Test
+    public void testRoundRobinWithRestartAndPortChange() throws 
NiFiClientException, IOException, InterruptedException {
+        ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity count = 
getClientUtil().createProcessor("CountEvents");
+
+        final ConnectionEntity connection = 
getClientUtil().createConnection(generate, count, "success");
+        getClientUtil().setAutoTerminatedRelationships(count, "success");
+
+        // Configure Processor to generate 20 FlowFiles, each 1 MB and run on 
Primary Node.
+        final Map<String, String> generateProperties = new HashMap<>();
+        generateProperties.put("File Size", "1 MB");
+        generateProperties.put("Batch Size", "20");
+        getClientUtil().updateProcessorProperties(generate, 
generateProperties);
+        getClientUtil().updateProcessorExecutionNode(generate, 
ExecutionNode.PRIMARY);
+
+        // Round Robin between nodes. This should result in 10 FlowFiles on 
each node.
+        getClientUtil().updateConnectionLoadBalancing(connection, 
LoadBalanceStrategy.ROUND_ROBIN, LoadBalanceCompression.DO_NOT_COMPRESS, null);
+
+        // Generate the data.
+        getNifiClient().getProcessorClient().startProcessor(generate);
+
+        // Wait until all 20 FlowFiles are queued up.
+        waitFor(() -> {
+            final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connection.getId());
+            return 
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() 
== 20;
+        });
+
+        // Wait until load balancing is complete
+        waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+
+        // Ensure that the FlowFiles are evenly distributed between the nodes.
+        final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connection.getId());
+        assertTrue(isEvenlyDistributed(statusEntity));
+
+        assertEquals(20, getQueueSize(connection.getId()));
+        assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));
+
+        getNifiClient().getProcessorClient().stopProcessor(generate);
+
+        // Empty the queue because on restart, Node 2 will rebalance all of 
its data using the Load-Balance strategy, and we don't want
+        // the data to start out lopsided.
+        getClientUtil().emptyQueue(connection.getId());
+
+        final NiFiInstance instance2 = 
this.getNiFiInstance().getNodeInstance(2);
+        instance2.stop();
+
+        final Map<String, String> updatedLoadBalanceProperties = new 
HashMap<>();
+        updatedLoadBalanceProperties.put("nifi.cluster.load.balance.host", 
"127.0.0.1");
+        updatedLoadBalanceProperties.put("nifi.cluster.load.balance.port", 
"7676");
+        instance2.setProperties(updatedLoadBalanceProperties);
+
+        instance2.start(true);
+        waitForAllNodesConnected();
+
+        // Generate the data again
+        generate = 
getNifiClient().getProcessorClient().getProcessor(generate.getId());
+        getNifiClient().getProcessorClient().startProcessor(generate);
+
+        // Wait until all 20 FlowFiles are queued up
+        waitFor(() -> {
+            final ConnectionStatusEntity secondRoundStatusEntity = 
getConnectionStatus(connection.getId());
+            return 
secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued()
 == 20;
+        });
+
+        // Wait until load balancing is complete
+        waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+
+        // Ensure that the FlowFiles are evenly distributed between the nodes.
+        final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = 
getConnectionStatus(connection.getId());
+        assertTrue(isEvenlyDistributed(afterSecondDataGenerationStatusEntity));
+
+        assertEquals(20, getQueueSize(connection.getId()));
+        assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));
+    }
+
 }

Reply via email to