This is an automated email from the ASF dual-hosted git repository.
mosermw pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 98bda73062 NIFI-12532: This closes #8179. Ensure that when
CommunicateAction completes (exceptionally or otherwise) that it gets removed
from the list of all CommunicationActions
98bda73062 is described below
commit 98bda7306204fb173a8fad97e86d21bc72a4268f
Author: Mark Payne <[email protected]>
AuthorDate: Thu Dec 21 10:49:15 2023 -0500
NIFI-12532: This closes #8179. Ensure that when CommunicateAction completes
(exceptionally or otherwise) that it gets removed from the list of all
CommunicationActions
Signed-off-by: Joseph Witt <[email protected]>
(cherry picked from commit c670161cb09855b28e444a4d32d37427b31e8422 by Mike
Moser <[email protected]>)
---
.../server/ConnectionLoadBalanceServer.java | 50 +++++++++++++---------
1 file changed, 30 insertions(+), 20 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
index c1782ab08a..906e5aa71c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
@@ -17,6 +17,17 @@
package org.apache.nifi.controller.queue.clustered.server;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLServerSocket;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
@@ -31,17 +42,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLServerSocket;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.io.socket.SocketUtils;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.security.util.TlsConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ConnectionLoadBalanceServer {
private static final Logger logger =
LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
@@ -104,10 +104,12 @@ public class ConnectionLoadBalanceServer {
acceptConnection.stop();
}
- final Iterator<CommunicateAction> itr =
communicationActions.iterator();
- while (itr.hasNext()) {
- itr.next().stop();
- itr.remove();
+ synchronized (communicationActions) { // Must synchronize on
Synchronized List when using iterator
+ final Iterator<CommunicateAction> itr =
communicationActions.iterator();
+ while (itr.hasNext()) {
+ itr.next().stop();
+ itr.remove();
+ }
}
}
@@ -135,8 +137,7 @@ public class ConnectionLoadBalanceServer {
private volatile boolean stopped = false;
- // This should be final but it is not to allow override during
testing; no production code modifies the value
- private static int EXCEPTION_THRESHOLD_MILLIS = 10_000;
+ private static final int EXCEPTION_THRESHOLD_MILLIS = 10_000;
private volatile long tlsErrorLastSeen = -1;
public CommunicateAction(final LoadBalanceProtocol
loadBalanceProtocol, final Socket socket, final EventReporter eventReporter)
throws IOException {
@@ -187,6 +188,8 @@ public class ConnectionLoadBalanceServer {
logger.error("Failed to communicate over Channel {}",
channelDescription, e);
eventReporter.reportEvent(Severity.ERROR, "Load
Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to "
+ e);
}
+
+ return;
}
}
}
@@ -265,11 +268,18 @@ public class ConnectionLoadBalanceServer {
socket.setSoTimeout(connectionTimeoutMillis);
final CommunicateAction communicateAction = new
CommunicateAction(loadBalanceProtocol, socket, eventReporter);
- final Thread commsThread = new Thread(communicateAction);
+ communicationActions.add(communicateAction);
+
+ final Thread commsThread = new Thread(() -> {
+ try {
+ communicateAction.run();
+ } finally {
+ communicationActions.remove(communicateAction);
+ }
+ });
+
commsThread.setName("Load-Balance Server Thread-" +
threadCounter.getAndIncrement());
commsThread.start();
-
- communicationActions.add(communicateAction);
} catch (final Exception e) {
logger.error("{} Failed to accept connection from other
node in cluster", ConnectionLoadBalanceServer.this, e);
}