This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 57d7e4be566 Fix QueryServer _allChannels cleanup on channel close
(#17854)
57d7e4be566 is described below
commit 57d7e4be56648430eb2363f234aeac96e1f52a21
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Tue Mar 10 17:25:44 2026 -0700
Fix QueryServer _allChannels cleanup on channel close (#17854)
_allChannels accumulates closed channel references indefinitely because
channels are added in initChannel() but never removed on close. Add a
closeFuture listener to remove the channel when it closes, preventing
the memory leak.
---
.../apache/pinot/core/transport/QueryServer.java | 6 +++++
.../pinot/core/transport/QueryServerTest.java | 27 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 765c474918f..2f2ca7e2b7d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -143,6 +143,7 @@ public class QueryServer {
@Override
protected void initChannel(SocketChannel ch) {
_allChannels.put(ch, true);
+ ch.closeFuture().addListener(f -> _allChannels.remove(ch));
ch.pipeline()
.addLast(ChannelHandlerFactory.getDirectOOMHandler(null,
null, null, _allChannels, _channel));
@@ -180,4 +181,9 @@ public class QueryServer {
ServerSocketChannel getChannel() {
return _channel;
}
+
+ @VisibleForTesting
+ int getConnectedChannelCount() {
+ return _allChannels.size();
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
index bef26090673..df72863cd0c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
@@ -28,6 +28,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.util.TestUtils;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -65,6 +66,32 @@ public class QueryServerTest {
assertFalse(connectionOk(serverAddress));
}
+ @Test
+ public void testAllChannelsCleanupOnClose()
+ throws Exception {
+ PinotMetricUtils.init(new PinotConfiguration());
+ PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+ ServerMetrics.register(new ServerMetrics(registry));
+ QueryServer server = new QueryServer(0, new NettyConfig(), null,
mock(ChannelHandler.class));
+ server.start();
+
+ InetSocketAddress serverAddress = server.getChannel().localAddress();
+ Socket socket = new Socket(serverAddress.getHostName(),
serverAddress.getPort());
+
+ try {
+ TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() >
0, 5_000L,
+ "Channel was not registered in _allChannels");
+
+ socket.close();
+
+ TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() ==
0, 5_000L,
+ "Channel was not removed from _allChannels after close");
+ } finally {
+ IOUtils.closeQuietly(socket);
+ server.shutDown();
+ }
+ }
+
private static boolean connectionOk(InetSocketAddress address) {
Socket s = null;
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]