This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 58daa997dc9 Fix server outbound write failure creating zombie channels
(#17845)
58daa997dc9 is described below
commit 58daa997dc9b1e6ed05b9843644a5dc6ff8a249a
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Tue Mar 10 00:34:50 2026 -0700
Fix server outbound write failure creating zombie channels (#17845)
---
.../apache/pinot/common/metrics/ServerMeter.java | 1 +
.../core/transport/InstanceRequestHandler.java | 39 ++++++++-------
.../core/transport/InstanceRequestHandlerTest.java | 56 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 16 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 03d77408b74..800cd0c0054 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -123,6 +123,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
+ NETTY_CONNECTION_SEND_RESPONSE_FAILURES("nettyConnection", true),
// GRPC related metrics
GRPC_QUERIES("grpcQueries", true),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index f0e3d728809..98fa80349b7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -302,23 +302,30 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
long sendResponseStartTimeMs = System.currentTimeMillis();
int queryProcessingTimeMs = (int) (sendResponseStartTimeMs -
queryArrivalTimeMs);
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f ->
{
- long sendResponseEndTimeMs = System.currentTimeMillis();
- int sendResponseLatencyMs = (int) (sendResponseEndTimeMs -
sendResponseStartTimeMs);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
1);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT,
serializedDataTable.length);
- _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
- sendResponseLatencyMs, TimeUnit.MILLISECONDS);
+ if (f.isSuccess()) {
+ long sendResponseEndTimeMs = System.currentTimeMillis();
+ int sendResponseLatencyMs = (int) (sendResponseEndTimeMs -
sendResponseStartTimeMs);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
1);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT,
serializedDataTable.length);
+ _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
+ sendResponseLatencyMs, TimeUnit.MILLISECONDS);
- int totalQueryTimeMs = (int) (sendResponseEndTimeMs -
queryArrivalTimeMs);
- if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
- LOGGER.info(
- "Slow query ({}): request handler processing time: {}, send
response latency: {}, total time to handle "
- + "request: {}", requestId, queryProcessingTimeMs,
sendResponseLatencyMs, totalQueryTimeMs);
- }
- if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
- LOGGER.warn("Large query ({}): response size in bytes: {}, table name
{}", requestId,
- serializedDataTable.length, tableNameWithType);
- ServerMetrics.get().addMeteredTableValue(tableNameWithType,
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+ int totalQueryTimeMs = (int) (sendResponseEndTimeMs -
queryArrivalTimeMs);
+ if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+ LOGGER.info(
+ "Slow query ({}): request handler processing time: {}, send
response latency: {}, total time to handle "
+ + "request: {}", requestId, queryProcessingTimeMs,
sendResponseLatencyMs, totalQueryTimeMs);
+ }
+ if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
+ LOGGER.warn("Large query ({}): response size in bytes: {}, table
name {}", requestId,
+ serializedDataTable.length, tableNameWithType);
+ _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+ }
+ } else {
+ Throwable cause = f.cause();
+ LOGGER.error("Failed to send response for request: {} table: {}",
requestId, tableNameWithType, cause);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_SEND_RESPONSE_FAILURES,
1);
+ ctx.close();
}
});
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
index c2eb2136dea..a9298e3ba95 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
@@ -22,10 +22,13 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
@@ -34,12 +37,17 @@ import
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.util.TestUtils;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -48,6 +56,13 @@ import static org.testng.Assert.assertTrue;
public class InstanceRequestHandlerTest {
+ @BeforeClass
+ public void setUp() {
+ PinotMetricUtils.init(new PinotConfiguration());
+ PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+ ServerMetrics.register(new ServerMetrics(registry));
+ }
+
@Test
public void testCancelQuery()
throws InterruptedException {
@@ -88,6 +103,47 @@ public class InstanceRequestHandlerTest {
assertFalse(handler.cancelQuery("unknown"));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWriteFailureClosesChannel()
+ throws Exception {
+ PinotConfiguration config = new PinotConfiguration();
+ CountDownLatch queryFinishLatch = new CountDownLatch(1);
+ QueryScheduler queryScheduler = createQueryScheduler(config,
queryFinishLatch);
+ InstanceRequestHandler handler =
+ new InstanceRequestHandler("server01", config, queryScheduler,
mock(AccessControl.class),
+ ThreadAccountantUtils.getNoOpAccountant());
+
+ ServerQueryRequest query = mock(ServerQueryRequest.class);
+ when(query.getQueryId()).thenReturn("test-query");
+ when(query.getTableNameWithType()).thenReturn("testTable_OFFLINE");
+ when(query.getRequestId()).thenReturn(1L);
+ QueryExecutionContext executionContext = mock(QueryExecutionContext.class);
+ when(executionContext.getCid()).thenReturn("test-query");
+ when(query.toExecutionContext(any())).thenReturn(executionContext);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ ChannelFuture writeFuture = mock(ChannelFuture.class);
+ when(ctx.writeAndFlush(any())).thenReturn(writeFuture);
+
+ ArgumentCaptor<GenericFutureListener> listenerCaptor =
ArgumentCaptor.forClass(GenericFutureListener.class);
+
when(writeFuture.addListener(listenerCaptor.capture())).thenReturn(writeFuture);
+
+ handler.submitQuery(query, ctx, System.currentTimeMillis());
+ queryFinishLatch.countDown();
+
+ TestUtils.waitForCondition((aVoid) ->
!listenerCaptor.getAllValues().isEmpty(), 10_000L,
+ "Timed out waiting for write listener to be registered");
+
+ Future<Void> failedFuture = mock(Future.class);
+ when(failedFuture.isSuccess()).thenReturn(false);
+ when(failedFuture.cause()).thenReturn(new OutOfMemoryError("Direct buffer
memory"));
+
+ listenerCaptor.getValue().operationComplete(failedFuture);
+
+ verify(ctx).close();
+ }
+
private QueryScheduler createQueryScheduler(PinotConfiguration config,
CountDownLatch queryFinishLatch) {
ResourceManager resourceManager = new UnboundedResourceManager(config);
return new QueryScheduler(config, "serverId", mock(QueryExecutor.class),
ThreadAccountantUtils.getNoOpAccountant(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]