This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 886081d320d NIFI-15735 Fix stale load balance clients when node 
reconnects with changed port. (#11038)
886081d320d is described below

commit 886081d320df359a049265592188deb7b3832605
Author: Matt Gilman <[email protected]>
AuthorDate: Wed Mar 25 05:56:12 2026 -0400

    NIFI-15735 Fix stale load balance clients when node reconnects with changed 
port. (#11038)
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  14 +-
 .../nio/NioAsyncLoadBalanceClientRegistry.java     |  19 ++
 .../TestSocketLoadBalancedFlowFileQueue.java       |  76 ++++++++
 .../nio/TestNioAsyncLoadBalanceClientRegistry.java | 209 +++++++++++++++++++++
 4 files changed, 316 insertions(+), 2 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 51b68806e37..996e20bc767 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -1114,8 +1114,18 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             partitionWriteLock.lock();
             try {
                 if (nodeIdentifiers.contains(nodeId)) {
-                    logger.debug("Node Identifier {} added to cluster but 
already known in set: {}", nodeId, nodeIdentifiers);
-                    return;
+                    final NodeIdentifier existingId = nodeIdentifiers.stream()
+                        .filter(id -> 
id.getId().equals(nodeId.getId())).findFirst().orElse(null);
+                    final boolean lbAddressChanged = existingId != null
+                        && (existingId.getLoadBalancePort() != 
nodeId.getLoadBalancePort()
+                            || 
!Objects.equals(existingId.getLoadBalanceAddress(), 
nodeId.getLoadBalanceAddress()));
+                    if (!lbAddressChanged) {
+                        logger.debug("Node Identifier {} added to cluster but 
already known in set with same LB address", nodeId);
+                        return;
+                    }
+                    logger.info("Node Identifier {} added to cluster with 
updated LB address. Previous: {}:{}, New: {}:{}",
+                        nodeId, existingId.getLoadBalanceAddress(), 
existingId.getLoadBalancePort(),
+                        nodeId.getLoadBalanceAddress(), 
nodeId.getLoadBalancePort());
                 }
 
                 final Set<NodeIdentifier> updatedNodeIds = new 
HashSet<>(nodeIdentifiers);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
index 215ca7d81ff..f3fbfe4c17a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.function.BooleanSupplier;
@@ -56,6 +57,17 @@ public class NioAsyncLoadBalanceClientRegistry implements 
AsyncLoadBalanceClient
                                       final Supplier<LoadBalanceCompression> 
compressionSupplier, final BooleanSupplier honorBackpressureSupplier) {
 
         Set<AsyncLoadBalanceClient> clients = clientMap.get(nodeId);
+        if (clients != null && clients.isEmpty()) {
+            logger.debug("Existing client set for {} is empty; will recreate", 
nodeId);
+            clientMap.remove(nodeId);
+            clients = null;
+        } else if (clients != null && isLoadBalanceAddressChanged(clients, 
nodeId)) {
+            logger.info("Load balance address for {} changed; replacing 
existing clients", nodeId);
+            clients.forEach(AsyncLoadBalanceClient::stop);
+            allClients.removeAll(clients);
+            clientMap.remove(nodeId);
+            clients = null;
+        }
         if (clients == null) {
             clients = registerClients(nodeId);
         }
@@ -89,6 +101,13 @@ public class NioAsyncLoadBalanceClientRegistry implements 
AsyncLoadBalanceClient
         logger.debug("Un-registered Connection with ID {} so that it will no 
longer send data to Node {}; {} clients were removed", connectionId, nodeId, 
toRemove.size());
     }
 
+    private boolean isLoadBalanceAddressChanged(final 
Set<AsyncLoadBalanceClient> clients, final NodeIdentifier nodeId) {
+        final AsyncLoadBalanceClient existingClient = 
clients.iterator().next();
+        final NodeIdentifier existingNodeId = 
existingClient.getNodeIdentifier();
+        return existingNodeId.getLoadBalancePort() != 
nodeId.getLoadBalancePort()
+                || !Objects.equals(existingNodeId.getLoadBalanceAddress(), 
nodeId.getLoadBalanceAddress());
+    }
+
     private Set<AsyncLoadBalanceClient> registerClients(final NodeIdentifier 
nodeId) {
         final Set<AsyncLoadBalanceClient> clients = new HashSet<>();
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index 3736856ef71..d1ce05e0104 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -443,6 +443,82 @@ public class TestSocketLoadBalancedFlowFileQueue {
         }
     }
 
+    @Test
+    public void testOnNodeAddedWithChangedLoadBalancePortUpdatesPartition() {
+        final NodeIdentifier originalNodeId = nodeIds.get(1);
+        final int originalLbPort = originalNodeId.getLoadBalancePort();
+
+        NodeIdentifier partitionNodeId = 
findPartitionNodeIdByUuid(originalNodeId.getId());
+        assertNotNull(partitionNodeId);
+        assertEquals(originalLbPort, partitionNodeId.getLoadBalancePort());
+
+        final int newLbPort = nodePort++;
+        final NodeIdentifier updatedNodeId = new NodeIdentifier(
+            originalNodeId.getId(),
+            originalNodeId.getApiAddress(), originalNodeId.getApiPort(),
+            originalNodeId.getSocketAddress(), originalNodeId.getSocketPort(),
+            originalNodeId.getLoadBalanceAddress(), newLbPort,
+            originalNodeId.getSiteToSiteAddress(), 
originalNodeId.getSiteToSitePort(),
+            originalNodeId.getSiteToSiteHttpApiPort(), 
originalNodeId.isSiteToSiteSecure(),
+            Collections.emptySet()
+        );
+
+        clusterTopologyEventListener.onNodeStateChange(updatedNodeId, 
NodeConnectionState.CONNECTED);
+
+        partitionNodeId = findPartitionNodeIdByUuid(originalNodeId.getId());
+        assertNotNull(partitionNodeId);
+        assertEquals(newLbPort, partitionNodeId.getLoadBalancePort());
+    }
+
+    @Test
+    public void 
testOnNodeAddedWithSameLoadBalancePortDoesNotRecreatePartition() {
+        final NodeIdentifier originalNodeId = nodeIds.get(1);
+        final QueuePartition originalPartition = 
findPartitionByUuid(originalNodeId.getId());
+        assertNotNull(originalPartition);
+
+        clusterTopologyEventListener.onNodeStateChange(originalNodeId, 
NodeConnectionState.CONNECTED);
+
+        final QueuePartition partitionAfter = 
findPartitionByUuid(originalNodeId.getId());
+        assertSame(originalPartition, partitionAfter);
+    }
+
+    @Test
+    public void testOnNodeAddedWithChangedLoadBalanceAddressUpdatesPartition() 
{
+        final NodeIdentifier originalNodeId = nodeIds.get(1);
+
+        final NodeIdentifier updatedNodeId = new NodeIdentifier(
+            originalNodeId.getId(),
+            originalNodeId.getApiAddress(), originalNodeId.getApiPort(),
+            originalNodeId.getSocketAddress(), originalNodeId.getSocketPort(),
+            "remotehost", originalNodeId.getLoadBalancePort(),
+            originalNodeId.getSiteToSiteAddress(), 
originalNodeId.getSiteToSitePort(),
+            originalNodeId.getSiteToSiteHttpApiPort(), 
originalNodeId.isSiteToSiteSecure(),
+            Collections.emptySet()
+        );
+
+        clusterTopologyEventListener.onNodeStateChange(updatedNodeId, 
NodeConnectionState.CONNECTED);
+
+        final NodeIdentifier partitionNodeId = 
findPartitionNodeIdByUuid(originalNodeId.getId());
+        assertNotNull(partitionNodeId);
+        assertEquals("remotehost", partitionNodeId.getLoadBalanceAddress());
+    }
+
+    private NodeIdentifier findPartitionNodeIdByUuid(final String uuid) {
+        final QueuePartition partition = findPartitionByUuid(uuid);
+        return partition == null ? null : 
partition.getNodeIdentifier().orElse(null);
+    }
+
+    private QueuePartition findPartitionByUuid(final String uuid) {
+        for (int i = 0; i < queue.getPartitionCount(); i++) {
+            final QueuePartition partition = queue.getPartition(i);
+            final NodeIdentifier nodeId = 
partition.getNodeIdentifier().orElse(null);
+            if (nodeId != null && nodeId.getId().equals(uuid)) {
+                return partition;
+            }
+        }
+        return null;
+    }
+
     @Test
     public void testOffloadAndReconnectKeepsQueueInCorrectOrder() {
         // Simulate FirstNodePartitioner, which always selects the first node 
in the partition queue
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestNioAsyncLoadBalanceClientRegistry.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestNioAsyncLoadBalanceClientRegistry.java
new file mode 100644
index 00000000000..67918bb71a1
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestNioAsyncLoadBalanceClientRegistry.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async.nio;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import 
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
+import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
+import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestNioAsyncLoadBalanceClientRegistry {
+
+    private static final String NODE_UUID = "node-1-uuid";
+    private static final String API_ADDRESS = "localhost";
+    private static final int API_PORT = 5672;
+    private static final String SOCKET_ADDRESS = "localhost";
+    private static final int SOCKET_PORT = 48101;
+
+    private static final BooleanSupplier EMPTY_SUPPLIER = () -> true;
+    private static final Supplier<FlowFileRecord> FLOW_FILE_SUPPLIER = () -> 
null;
+    private static final TransactionFailureCallback FAILURE_CALLBACK = new 
TransactionFailureCallback() {
+        @Override
+        public void onTransactionFailed(final List<FlowFileRecord> flowFiles, 
final Exception cause, final TransactionPhase transactionPhase) {
+        }
+
+        @Override
+        public boolean isRebalanceOnFailure() {
+            return false;
+        }
+    };
+    private static final TransactionCompleteCallback SUCCESS_CALLBACK = 
(flowFiles, nodeId) -> { };
+    private static final Supplier<LoadBalanceCompression> COMPRESSION_SUPPLIER 
= () -> LoadBalanceCompression.DO_NOT_COMPRESS;
+    private static final BooleanSupplier BACKPRESSURE_SUPPLIER = () -> true;
+
+    private List<StubAsyncLoadBalanceClient> createdClients;
+    private NioAsyncLoadBalanceClientRegistry registry;
+
+    @BeforeEach
+    void setUp() {
+        createdClients = new ArrayList<>();
+
+        final NioAsyncLoadBalanceClientFactory factory = new 
NioAsyncLoadBalanceClientFactory(null, 30000, null, null, null, null) {
+            @Override
+            public NioAsyncLoadBalanceClient createClient(final NodeIdentifier 
nodeIdentifier) {
+                final StubAsyncLoadBalanceClient client = new 
StubAsyncLoadBalanceClient(nodeIdentifier);
+                createdClients.add(client);
+                return client;
+            }
+        };
+
+        registry = new NioAsyncLoadBalanceClientRegistry(factory, 1);
+    }
+
+    @Test
+    void testRegisterCreatesNewClientForNewNode() {
+        final NodeIdentifier nodeId = createNodeId("localhost", 6342);
+        registry.register("conn-1", nodeId, EMPTY_SUPPLIER, FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        assertEquals(1, createdClients.size());
+        assertEquals(6342, 
createdClients.getFirst().getNodeIdentifier().getLoadBalancePort());
+    }
+
+    @Test
+    void testRegisterReusesExistingClientForSamePort() {
+        final NodeIdentifier nodeId = createNodeId("localhost", 6342);
+        registry.register("conn-1", nodeId, EMPTY_SUPPLIER, FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+        registry.register("conn-2", nodeId, EMPTY_SUPPLIER, FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        assertEquals(1, createdClients.size());
+    }
+
+    @Test
+    void testRegisterReplacesClientOnPortChange() {
+        final NodeIdentifier originalNodeId = createNodeId("localhost", 6342);
+        registry.register("conn-1", originalNodeId, EMPTY_SUPPLIER, 
FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        assertEquals(1, createdClients.size());
+        final StubAsyncLoadBalanceClient originalClient = 
createdClients.getFirst();
+        assertEquals(6342, 
originalClient.getNodeIdentifier().getLoadBalancePort());
+
+        final NodeIdentifier updatedNodeId = createNodeId("localhost", 7676);
+        registry.register("conn-1", updatedNodeId, EMPTY_SUPPLIER, 
FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        assertEquals(2, createdClients.size());
+        final StubAsyncLoadBalanceClient newClient = createdClients.get(1);
+        assertEquals(7676, newClient.getNodeIdentifier().getLoadBalancePort());
+        assertTrue(originalClient.stopped, "Original client should have been 
stopped");
+
+        final Set<AsyncLoadBalanceClient> allClients = 
registry.getAllClients();
+        assertEquals(1, allClients.size());
+        assertNotEquals(originalClient, allClients.iterator().next());
+    }
+
+    @Test
+    void testRegisterReplacesClientOnAddressChange() {
+        final NodeIdentifier originalNodeId = createNodeId("localhost", 6342);
+        registry.register("conn-1", originalNodeId, EMPTY_SUPPLIER, 
FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        final NodeIdentifier updatedNodeId = createNodeId("127.0.0.1", 6342);
+        registry.register("conn-1", updatedNodeId, EMPTY_SUPPLIER, 
FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        assertEquals(2, createdClients.size());
+        assertEquals("127.0.0.1", 
createdClients.get(1).getNodeIdentifier().getLoadBalanceAddress());
+        assertTrue(createdClients.getFirst().stopped);
+    }
+
+    @Test
+    void testStartedRegistryStartsReplacementClients() {
+        registry.start();
+
+        final NodeIdentifier originalNodeId = createNodeId("localhost", 6342);
+        registry.register("conn-1", originalNodeId, EMPTY_SUPPLIER, 
FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        final NodeIdentifier updatedNodeId = createNodeId("localhost", 7676);
+        registry.register("conn-1", updatedNodeId, EMPTY_SUPPLIER, 
FLOW_FILE_SUPPLIER,
+                FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER, 
BACKPRESSURE_SUPPLIER);
+
+        final StubAsyncLoadBalanceClient newClient = createdClients.get(1);
+        assertTrue(newClient.started, "Replacement client should be started 
when registry is running");
+    }
+
+    private NodeIdentifier createNodeId(final String lbAddress, final int 
lbPort) {
+        return new NodeIdentifier(NODE_UUID, API_ADDRESS, API_PORT, 
SOCKET_ADDRESS, SOCKET_PORT,
+                lbAddress, lbPort, null, null, null, false);
+    }
+
+    private static class StubAsyncLoadBalanceClient extends 
NioAsyncLoadBalanceClient {
+        boolean started = false;
+        boolean stopped = false;
+
+        StubAsyncLoadBalanceClient(final NodeIdentifier nodeIdentifier) {
+            super(nodeIdentifier, null, 30000, null, null, null, null);
+        }
+
+        @Override
+        public void start() {
+            started = true;
+        }
+
+        @Override
+        public void stop() {
+            stopped = true;
+        }
+
+        @Override
+        public void register(final String connectionId, final BooleanSupplier 
emptySupplier,
+                             final Supplier<FlowFileRecord> flowFileSupplier,
+                             final TransactionFailureCallback failureCallback,
+                             final TransactionCompleteCallback successCallback,
+                             final Supplier<LoadBalanceCompression> 
compressionSupplier,
+                             final BooleanSupplier honorBackpressureSupplier) {
+        }
+
+        @Override
+        public void unregister(final String connectionId) {
+        }
+
+        @Override
+        public int getRegisteredConnectionCount() {
+            return 0;
+        }
+
+        @Override
+        public boolean isRunning() {
+            return started && !stopped;
+        }
+
+        @Override
+        public boolean isPenalized() {
+            return false;
+        }
+    }
+}

Reply via email to