This is an automated email from the ASF dual-hosted git repository.
rohit pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/main by this push:
new 99863c2fa5a Imporve socketChannel closing in NioConnection (#10895)
99863c2fa5a is described below
commit 99863c2fa5a57d8ddf1b4b40c647b159d96dc74c
Author: Harikrishna <[email protected]>
AuthorDate: Fri May 23 13:13:04 2025 +0530
Imporve socketChannel closing in NioConnection (#10895)
---
.../java/com/cloud/utils/nio/NioConnection.java | 64 +++++++++++++++-------
.../main/java/com/cloud/utils/nio/NioServer.java | 8 +--
2 files changed, 48 insertions(+), 24 deletions(-)
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
index 7e8fe32af76..e2ecbb3210e 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
@@ -34,11 +34,11 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -80,7 +80,7 @@ public abstract class NioConnection implements
Callable<Boolean> {
protected ExecutorService _executor;
protected ExecutorService _sslHandshakeExecutor;
protected CAService caService;
- protected Set<SocketChannel> socketChannels = new HashSet<>();
+ protected Set<SocketChannel> socketChannels =
ConcurrentHashMap.newKeySet();
protected Integer sslHandshakeTimeout = null;
private final int factoryMaxNewConnectionsCount;
protected boolean blockNewConnections;
@@ -219,7 +219,7 @@ public abstract class NioConnection implements
Callable<Boolean> {
return true;
}
- abstract void init() throws IOException;
+ protected abstract void init() throws IOException;
abstract void registerLink(InetSocketAddress saddr, Link link);
@@ -489,16 +489,47 @@ public abstract class NioConnection implements
Callable<Boolean> {
}
protected void closeConnection(final SelectionKey key) {
- if (key != null) {
- final SocketChannel channel = (SocketChannel)key.channel();
- key.cancel();
+ if (key == null) {
+ return;
+ }
+
+ SocketChannel channel = null;
+ try {
+ // 1. Check type and handle potential CancelledKeyException
+ if (key.isValid() && key.channel() instanceof SocketChannel) {
+ channel = (SocketChannel) key.channel();
+ }
+ } catch (CancelledKeyException e) {
+ logger.trace("Key already cancelled when trying to get channel in
closeConnection.");
+ }
+
+ // 2. Cancel the key (safe to call even if already cancelled)
+ key.cancel();
+
+ if (channel == null) {
+ logger.trace("Channel was null, invalid, or not a SocketChannel
for key: " + key);
+ return;
+ }
+
+ // 3. Try to close the channel if we obtained it
+ if (channel != null) {
+ closeChannel(channel);
+ } else {
+ logger.trace("Channel was null, invalid, or not a SocketChannel
for key: " + key);
+ }
+ }
+
+ private void closeChannel(SocketChannel channel) {
+ if (channel != null && channel.isOpen()) {
try {
- if (channel != null) {
- logger.debug("Closing socket {}", channel.socket());
- channel.close();
- }
- } catch (final IOException ignore) {
- logger.info("[ignored] channel");
+ logger.debug("Closing socket " + channel.socket());
+ channel.close();
+ } catch (IOException ignore) {
+ logger.warn(String.format("[ignored] Exception closing
channel: %s, due to %s", channel, ignore.getMessage()));
+ } catch (Exception e) {
+ logger.warn(String.format("Unexpected exception in closing
channel %s", channel), e);
+ } finally {
+ socketChannels.remove(channel);
}
}
}
@@ -530,14 +561,7 @@ public abstract class NioConnection implements
Callable<Boolean> {
/* Release the resource used by the instance */
public void cleanUp() throws IOException {
for (SocketChannel channel : socketChannels) {
- if (channel != null && channel.isOpen()) {
- try {
- logger.info("Closing connection: {}",
channel.getRemoteAddress());
- channel.close();
- } catch (IOException e) {
- logger.warn("Unable to close connection due to {}",
e.getMessage());
- }
- }
+ closeChannel(channel);
}
if (_selector != null) {
_selector.close();
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java
b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
index fd5af516bad..c4f44afecce 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
@@ -25,7 +25,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
-import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.cloudstack.framework.ca.CAService;
@@ -34,15 +34,15 @@ public class NioServer extends NioConnection {
protected InetSocketAddress localAddress;
private ServerSocketChannel serverSocket;
- protected WeakHashMap<InetSocketAddress, Link> links;
+ protected ConcurrentHashMap<InetSocketAddress, Link> links;
public NioServer(final String name, final int port, final int workers,
final HandlerFactory factory,
final CAService caService, final Integer sslHandShakeTimeout) {
- super(name, port, workers,factory);
+ super(name, port, workers, factory);
setCAService(caService);
setSslHandshakeTimeout(sslHandShakeTimeout);
localAddress = null;
- links = new WeakHashMap<>(1024);
+ links = new ConcurrentHashMap<>(1024);
}
public int getPort() {