This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 886081d320d NIFI-15735 Fix stale load balance clients when node
reconnects with changed port. (#11038)
886081d320d is described below
commit 886081d320df359a049265592188deb7b3832605
Author: Matt Gilman <[email protected]>
AuthorDate: Wed Mar 25 05:56:12 2026 -0400
NIFI-15735 Fix stale load balance clients when node reconnects with changed
port. (#11038)
---
.../clustered/SocketLoadBalancedFlowFileQueue.java | 14 +-
.../nio/NioAsyncLoadBalanceClientRegistry.java | 19 ++
.../TestSocketLoadBalancedFlowFileQueue.java | 76 ++++++++
.../nio/TestNioAsyncLoadBalanceClientRegistry.java | 209 +++++++++++++++++++++
4 files changed, 316 insertions(+), 2 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 51b68806e37..996e20bc767 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -1114,8 +1114,18 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
partitionWriteLock.lock();
try {
if (nodeIdentifiers.contains(nodeId)) {
- logger.debug("Node Identifier {} added to cluster but
already known in set: {}", nodeId, nodeIdentifiers);
- return;
+ final NodeIdentifier existingId = nodeIdentifiers.stream()
+ .filter(id ->
id.getId().equals(nodeId.getId())).findFirst().orElse(null);
+ final boolean lbAddressChanged = existingId != null
+ && (existingId.getLoadBalancePort() !=
nodeId.getLoadBalancePort()
+ ||
!Objects.equals(existingId.getLoadBalanceAddress(),
nodeId.getLoadBalanceAddress()));
+ if (!lbAddressChanged) {
+ logger.debug("Node Identifier {} added to cluster but
already known in set with same LB address", nodeId);
+ return;
+ }
+ logger.info("Node Identifier {} added to cluster with
updated LB address. Previous: {}:{}, New: {}:{}",
+ nodeId, existingId.getLoadBalanceAddress(),
existingId.getLoadBalancePort(),
+ nodeId.getLoadBalanceAddress(),
nodeId.getLoadBalancePort());
}
final Set<NodeIdentifier> updatedNodeIds = new
HashSet<>(nodeIdentifiers);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
index 215ca7d81ff..f3fbfe4c17a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BooleanSupplier;
@@ -56,6 +57,17 @@ public class NioAsyncLoadBalanceClientRegistry implements
AsyncLoadBalanceClient
final Supplier<LoadBalanceCompression>
compressionSupplier, final BooleanSupplier honorBackpressureSupplier) {
Set<AsyncLoadBalanceClient> clients = clientMap.get(nodeId);
+ if (clients != null && clients.isEmpty()) {
+ logger.debug("Existing client set for {} is empty; will recreate",
nodeId);
+ clientMap.remove(nodeId);
+ clients = null;
+ } else if (clients != null && isLoadBalanceAddressChanged(clients,
nodeId)) {
+ logger.info("Load balance address for {} changed; replacing
existing clients", nodeId);
+ clients.forEach(AsyncLoadBalanceClient::stop);
+ allClients.removeAll(clients);
+ clientMap.remove(nodeId);
+ clients = null;
+ }
if (clients == null) {
clients = registerClients(nodeId);
}
@@ -89,6 +101,13 @@ public class NioAsyncLoadBalanceClientRegistry implements
AsyncLoadBalanceClient
logger.debug("Un-registered Connection with ID {} so that it will no
longer send data to Node {}; {} clients were removed", connectionId, nodeId,
toRemove.size());
}
+ private boolean isLoadBalanceAddressChanged(final
Set<AsyncLoadBalanceClient> clients, final NodeIdentifier nodeId) {
+ final AsyncLoadBalanceClient existingClient =
clients.iterator().next();
+ final NodeIdentifier existingNodeId =
existingClient.getNodeIdentifier();
+ return existingNodeId.getLoadBalancePort() !=
nodeId.getLoadBalancePort()
+ || !Objects.equals(existingNodeId.getLoadBalanceAddress(),
nodeId.getLoadBalanceAddress());
+ }
+
private Set<AsyncLoadBalanceClient> registerClients(final NodeIdentifier
nodeId) {
final Set<AsyncLoadBalanceClient> clients = new HashSet<>();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index 3736856ef71..d1ce05e0104 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -443,6 +443,82 @@ public class TestSocketLoadBalancedFlowFileQueue {
}
}
+ @Test
+ public void testOnNodeAddedWithChangedLoadBalancePortUpdatesPartition() {
+ final NodeIdentifier originalNodeId = nodeIds.get(1);
+ final int originalLbPort = originalNodeId.getLoadBalancePort();
+
+ NodeIdentifier partitionNodeId =
findPartitionNodeIdByUuid(originalNodeId.getId());
+ assertNotNull(partitionNodeId);
+ assertEquals(originalLbPort, partitionNodeId.getLoadBalancePort());
+
+ final int newLbPort = nodePort++;
+ final NodeIdentifier updatedNodeId = new NodeIdentifier(
+ originalNodeId.getId(),
+ originalNodeId.getApiAddress(), originalNodeId.getApiPort(),
+ originalNodeId.getSocketAddress(), originalNodeId.getSocketPort(),
+ originalNodeId.getLoadBalanceAddress(), newLbPort,
+ originalNodeId.getSiteToSiteAddress(),
originalNodeId.getSiteToSitePort(),
+ originalNodeId.getSiteToSiteHttpApiPort(),
originalNodeId.isSiteToSiteSecure(),
+ Collections.emptySet()
+ );
+
+ clusterTopologyEventListener.onNodeStateChange(updatedNodeId,
NodeConnectionState.CONNECTED);
+
+ partitionNodeId = findPartitionNodeIdByUuid(originalNodeId.getId());
+ assertNotNull(partitionNodeId);
+ assertEquals(newLbPort, partitionNodeId.getLoadBalancePort());
+ }
+
+ @Test
+ public void
testOnNodeAddedWithSameLoadBalancePortDoesNotRecreatePartition() {
+ final NodeIdentifier originalNodeId = nodeIds.get(1);
+ final QueuePartition originalPartition =
findPartitionByUuid(originalNodeId.getId());
+ assertNotNull(originalPartition);
+
+ clusterTopologyEventListener.onNodeStateChange(originalNodeId,
NodeConnectionState.CONNECTED);
+
+ final QueuePartition partitionAfter =
findPartitionByUuid(originalNodeId.getId());
+ assertSame(originalPartition, partitionAfter);
+ }
+
+ @Test
+ public void testOnNodeAddedWithChangedLoadBalanceAddressUpdatesPartition()
{
+ final NodeIdentifier originalNodeId = nodeIds.get(1);
+
+ final NodeIdentifier updatedNodeId = new NodeIdentifier(
+ originalNodeId.getId(),
+ originalNodeId.getApiAddress(), originalNodeId.getApiPort(),
+ originalNodeId.getSocketAddress(), originalNodeId.getSocketPort(),
+ "remotehost", originalNodeId.getLoadBalancePort(),
+ originalNodeId.getSiteToSiteAddress(),
originalNodeId.getSiteToSitePort(),
+ originalNodeId.getSiteToSiteHttpApiPort(),
originalNodeId.isSiteToSiteSecure(),
+ Collections.emptySet()
+ );
+
+ clusterTopologyEventListener.onNodeStateChange(updatedNodeId,
NodeConnectionState.CONNECTED);
+
+ final NodeIdentifier partitionNodeId =
findPartitionNodeIdByUuid(originalNodeId.getId());
+ assertNotNull(partitionNodeId);
+ assertEquals("remotehost", partitionNodeId.getLoadBalanceAddress());
+ }
+
+ private NodeIdentifier findPartitionNodeIdByUuid(final String uuid) {
+ final QueuePartition partition = findPartitionByUuid(uuid);
+ return partition == null ? null :
partition.getNodeIdentifier().orElse(null);
+ }
+
+ private QueuePartition findPartitionByUuid(final String uuid) {
+ for (int i = 0; i < queue.getPartitionCount(); i++) {
+ final QueuePartition partition = queue.getPartition(i);
+ final NodeIdentifier nodeId =
partition.getNodeIdentifier().orElse(null);
+ if (nodeId != null && nodeId.getId().equals(uuid)) {
+ return partition;
+ }
+ }
+ return null;
+ }
+
@Test
public void testOffloadAndReconnectKeepsQueueInCorrectOrder() {
// Simulate FirstNodePartitioner, which always selects the first node
in the partition queue
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestNioAsyncLoadBalanceClientRegistry.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestNioAsyncLoadBalanceClientRegistry.java
new file mode 100644
index 00000000000..67918bb71a1
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestNioAsyncLoadBalanceClientRegistry.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async.nio;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
+import
org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
+import
org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestNioAsyncLoadBalanceClientRegistry {
+
+ private static final String NODE_UUID = "node-1-uuid";
+ private static final String API_ADDRESS = "localhost";
+ private static final int API_PORT = 5672;
+ private static final String SOCKET_ADDRESS = "localhost";
+ private static final int SOCKET_PORT = 48101;
+
+ private static final BooleanSupplier EMPTY_SUPPLIER = () -> true;
+ private static final Supplier<FlowFileRecord> FLOW_FILE_SUPPLIER = () ->
null;
+ private static final TransactionFailureCallback FAILURE_CALLBACK = new
TransactionFailureCallback() {
+ @Override
+ public void onTransactionFailed(final List<FlowFileRecord> flowFiles,
final Exception cause, final TransactionPhase transactionPhase) {
+ }
+
+ @Override
+ public boolean isRebalanceOnFailure() {
+ return false;
+ }
+ };
+ private static final TransactionCompleteCallback SUCCESS_CALLBACK =
(flowFiles, nodeId) -> { };
+ private static final Supplier<LoadBalanceCompression> COMPRESSION_SUPPLIER
= () -> LoadBalanceCompression.DO_NOT_COMPRESS;
+ private static final BooleanSupplier BACKPRESSURE_SUPPLIER = () -> true;
+
+ private List<StubAsyncLoadBalanceClient> createdClients;
+ private NioAsyncLoadBalanceClientRegistry registry;
+
+ @BeforeEach
+ void setUp() {
+ createdClients = new ArrayList<>();
+
+ final NioAsyncLoadBalanceClientFactory factory = new
NioAsyncLoadBalanceClientFactory(null, 30000, null, null, null, null) {
+ @Override
+ public NioAsyncLoadBalanceClient createClient(final NodeIdentifier
nodeIdentifier) {
+ final StubAsyncLoadBalanceClient client = new
StubAsyncLoadBalanceClient(nodeIdentifier);
+ createdClients.add(client);
+ return client;
+ }
+ };
+
+ registry = new NioAsyncLoadBalanceClientRegistry(factory, 1);
+ }
+
+ @Test
+ void testRegisterCreatesNewClientForNewNode() {
+ final NodeIdentifier nodeId = createNodeId("localhost", 6342);
+ registry.register("conn-1", nodeId, EMPTY_SUPPLIER, FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ assertEquals(1, createdClients.size());
+ assertEquals(6342,
createdClients.getFirst().getNodeIdentifier().getLoadBalancePort());
+ }
+
+ @Test
+ void testRegisterReusesExistingClientForSamePort() {
+ final NodeIdentifier nodeId = createNodeId("localhost", 6342);
+ registry.register("conn-1", nodeId, EMPTY_SUPPLIER, FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+ registry.register("conn-2", nodeId, EMPTY_SUPPLIER, FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ assertEquals(1, createdClients.size());
+ }
+
+ @Test
+ void testRegisterReplacesClientOnPortChange() {
+ final NodeIdentifier originalNodeId = createNodeId("localhost", 6342);
+ registry.register("conn-1", originalNodeId, EMPTY_SUPPLIER,
FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ assertEquals(1, createdClients.size());
+ final StubAsyncLoadBalanceClient originalClient =
createdClients.getFirst();
+ assertEquals(6342,
originalClient.getNodeIdentifier().getLoadBalancePort());
+
+ final NodeIdentifier updatedNodeId = createNodeId("localhost", 7676);
+ registry.register("conn-1", updatedNodeId, EMPTY_SUPPLIER,
FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ assertEquals(2, createdClients.size());
+ final StubAsyncLoadBalanceClient newClient = createdClients.get(1);
+ assertEquals(7676, newClient.getNodeIdentifier().getLoadBalancePort());
+ assertTrue(originalClient.stopped, "Original client should have been
stopped");
+
+ final Set<AsyncLoadBalanceClient> allClients =
registry.getAllClients();
+ assertEquals(1, allClients.size());
+ assertNotEquals(originalClient, allClients.iterator().next());
+ }
+
+ @Test
+ void testRegisterReplacesClientOnAddressChange() {
+ final NodeIdentifier originalNodeId = createNodeId("localhost", 6342);
+ registry.register("conn-1", originalNodeId, EMPTY_SUPPLIER,
FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ final NodeIdentifier updatedNodeId = createNodeId("127.0.0.1", 6342);
+ registry.register("conn-1", updatedNodeId, EMPTY_SUPPLIER,
FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ assertEquals(2, createdClients.size());
+ assertEquals("127.0.0.1",
createdClients.get(1).getNodeIdentifier().getLoadBalanceAddress());
+ assertTrue(createdClients.getFirst().stopped);
+ }
+
+ @Test
+ void testStartedRegistryStartsReplacementClients() {
+ registry.start();
+
+ final NodeIdentifier originalNodeId = createNodeId("localhost", 6342);
+ registry.register("conn-1", originalNodeId, EMPTY_SUPPLIER,
FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ final NodeIdentifier updatedNodeId = createNodeId("localhost", 7676);
+ registry.register("conn-1", updatedNodeId, EMPTY_SUPPLIER,
FLOW_FILE_SUPPLIER,
+ FAILURE_CALLBACK, SUCCESS_CALLBACK, COMPRESSION_SUPPLIER,
BACKPRESSURE_SUPPLIER);
+
+ final StubAsyncLoadBalanceClient newClient = createdClients.get(1);
+ assertTrue(newClient.started, "Replacement client should be started
when registry is running");
+ }
+
+ private NodeIdentifier createNodeId(final String lbAddress, final int
lbPort) {
+ return new NodeIdentifier(NODE_UUID, API_ADDRESS, API_PORT,
SOCKET_ADDRESS, SOCKET_PORT,
+ lbAddress, lbPort, null, null, null, false);
+ }
+
+ private static class StubAsyncLoadBalanceClient extends
NioAsyncLoadBalanceClient {
+ boolean started = false;
+ boolean stopped = false;
+
+ StubAsyncLoadBalanceClient(final NodeIdentifier nodeIdentifier) {
+ super(nodeIdentifier, null, 30000, null, null, null, null);
+ }
+
+ @Override
+ public void start() {
+ started = true;
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ @Override
+ public void register(final String connectionId, final BooleanSupplier
emptySupplier,
+ final Supplier<FlowFileRecord> flowFileSupplier,
+ final TransactionFailureCallback failureCallback,
+ final TransactionCompleteCallback successCallback,
+ final Supplier<LoadBalanceCompression>
compressionSupplier,
+ final BooleanSupplier honorBackpressureSupplier) {
+ }
+
+ @Override
+ public void unregister(final String connectionId) {
+ }
+
+ @Override
+ public int getRegisteredConnectionCount() {
+ return 0;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return started && !stopped;
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return false;
+ }
+ }
+}