Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2947#discussion_r220975941
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java
---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async.nio;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NioAsyncLoadBalanceClientTask implements Runnable {
+ private static final Logger logger =
LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class);
+ private static final String EVENT_CATEGORY = "Load-Balanced
Connection";
+
+ private final NioAsyncLoadBalanceClientRegistry clientRegistry;
+ private final ClusterCoordinator clusterCoordinator;
+ private final EventReporter eventReporter;
+ private volatile boolean running = true;
+
+ public NioAsyncLoadBalanceClientTask(final
NioAsyncLoadBalanceClientRegistry clientRegistry, final ClusterCoordinator
clusterCoordinator, final EventReporter eventReporter) {
+ this.clientRegistry = clientRegistry;
+ this.clusterCoordinator = clusterCoordinator;
+ this.eventReporter = eventReporter;
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ boolean success = false;
+ for (final NioAsyncLoadBalanceClient client :
clientRegistry.getAllClients()) {
+ if (!client.isRunning()) {
+ logger.trace("Client {} is not running so will not
communicate with it", client);
+ continue;
+ }
+
+ if (client.isPenalized()) {
+ logger.trace("Client {} is penalized so will not
communicate with it", client);
+ continue;
+ }
+
+ final NodeIdentifier clientNodeId =
client.getNodeIdentifier();
+ final NodeConnectionStatus connectionStatus =
clusterCoordinator.getConnectionStatus(clientNodeId);
+ final NodeConnectionState connectionState =
connectionStatus.getState();
+ if (connectionState != NodeConnectionState.CONNECTED) {
--- End diff --
That's a good catch.
---