NIFI-5585: Adjustments to the Connection Load Balancing to ensure that node offloading works smoothly
Signed-off-by: Jeff Storck <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1a4c997 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1a4c997 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1a4c997 Branch: refs/heads/master Commit: a1a4c997634aa7edabda42407a0a7627d33e73fd Parents: 01e2098 Author: Mark Payne <[email protected]> Authored: Mon Oct 8 09:53:14 2018 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Oct 11 09:23:01 2018 -0400 ---------------------------------------------------------------------- .../client/async/AsyncLoadBalanceClient.java | 2 ++ .../async/nio/NioAsyncLoadBalanceClient.java | 4 ++++ .../nio/NioAsyncLoadBalanceClientRegistry.java | 20 ++++++++++++++++---- .../nio/NioAsyncLoadBalanceClientTask.java | 8 ++------ 4 files changed, 24 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java index 1bb4053..8673a8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/AsyncLoadBalanceClient.java @@ -39,6 +39,8 @@ public interface AsyncLoadBalanceClient { void unregister(String connectionId); + int getRegisteredConnectionCount(); + boolean isRunning(); boolean isPenalized(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java index 066b597..753c1f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java @@ -119,6 +119,10 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { registeredPartitions.remove(connectionId); } + public synchronized int getRegisteredConnectionCount() { + return registeredPartitions.size(); + } + private synchronized Map<String, RegisteredPartition> getRegisteredPartitions() { return new HashMap<>(registeredPartitions); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java index 514a58c..3322035 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientRegistry.java @@ -67,15 +67,27 @@ public class NioAsyncLoadBalanceClientRegistry implements AsyncLoadBalanceClient @Override public synchronized void unregister(final String connectionId, final NodeIdentifier nodeId) { - final Set<AsyncLoadBalanceClient> clients = clientMap.remove(nodeId); + final Set<AsyncLoadBalanceClient> clients = clientMap.get(nodeId); if (clients == null) { return; } - clients.forEach(client -> client.unregister(connectionId)); + final Set<AsyncLoadBalanceClient> toRemove = new HashSet<>(); + for (final AsyncLoadBalanceClient client : clients) { + client.unregister(connectionId); + if (client.getRegisteredConnectionCount() == 0) { + toRemove.add(client); + } + } + + clients.removeAll(toRemove); + allClients.removeAll(toRemove); + + if (clients.isEmpty()) { + clientMap.remove(nodeId); + } - allClients.removeAll(clients); - logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}", connectionId, nodeId); + logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}; {} clients were removed", connectionId, nodeId, toRemove.size()); } private Set<AsyncLoadBalanceClient> registerClients(final NodeIdentifier nodeId) { http://git-wip-us.apache.org/repos/asf/nifi/blob/a1a4c997/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java index 35ea5f9..5c8073a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java @@ -66,13 +66,9 @@ public class NioAsyncLoadBalanceClientTask implements Runnable { } final NodeConnectionState connectionState = connectionStatus.getState(); - if (connectionState == NodeConnectionState.DISCONNECTED || connectionState == NodeConnectionState.DISCONNECTING) { - client.nodeDisconnected(); - continue; - } - if (connectionState != NodeConnectionState.CONNECTED) { - logger.debug("Client {} is for node that is not currently connected (state = {}) so will not communicate with node", client, connectionState); + logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState); + client.nodeDisconnected(); continue; }
