Copilot commented on code in PR #17861:
URL: https://github.com/apache/pinot/pull/17861#discussion_r2921174225


##########
pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java:
##########
@@ -242,10 +253,18 @@ void sendRequestWithoutLocking(String rawTableName, 
AsyncQueryResponse asyncQuer
         ServerRoutingInstance serverRoutingInstance, byte[] requestBytes) {
       long startTimeMs = System.currentTimeMillis();
       
_channel.writeAndFlush(Unpooled.wrappedBuffer(requestBytes)).addListener(f -> {
-        int requestSentLatencyMs = (int) (System.currentTimeMillis() - 
startTimeMs);
-        _brokerMetrics.addTimedTableValue(rawTableName, 
BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
-            requestSentLatencyMs, TimeUnit.MILLISECONDS);
-        asyncQueryResponse.markRequestSent(serverRoutingInstance, 
requestSentLatencyMs);
+        if (f.isSuccess()) {
+          int requestSentLatencyMs = (int) (System.currentTimeMillis() - 
startTimeMs);
+          _brokerMetrics.addTimedTableValue(rawTableName, 
BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
+              requestSentLatencyMs, TimeUnit.MILLISECONDS);
+          asyncQueryResponse.markRequestSent(serverRoutingInstance, 
requestSentLatencyMs);
+        } else {
+          LOGGER.error("Write failure to server: {} for table: {}", 
serverRoutingInstance, rawTableName, f.cause());
+          
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_SEND_REQUEST_FAILURES,
 1);
+          asyncQueryResponse.markServerDown(serverRoutingInstance,
+              new RuntimeException("Failed to send request to server: " + 
serverRoutingInstance, f.cause()));
+          _channel.close();
+        }

Review Comment:
   In the write failure listener, calling `_channel.close()` uses the mutable 
`ServerChannel._channel` field, which may have been reconnected/updated by 
another request by the time the listener runs. That can accidentally close a 
newly-established healthy channel instead of the channel associated with the 
failed write. Capture the current channel in a local variable before 
`writeAndFlush()` (or use the channel from the future) and close that specific 
channel on failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to