This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 89b0d01b2c Add query request ID to log lines for slow / large queries 
(#14627)
89b0d01b2c is described below

commit 89b0d01b2c8db7bb71780d178be762edf392bd0a
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Dec 13 12:09:30 2024 +0700

    Add query request ID to log lines for slow / large queries (#14627)
---
 .../pinot/core/transport/InstanceRequestHandler.java   | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 4940823b74..8f63fd4f69 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -183,7 +183,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
 
   private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, 
String tableNameWithType,
       long queryArrivalTimeMs, InstanceRequest instanceRequest, 
ServerQueryRequest queryRequest) {
-    return new FutureCallback<byte[]>() {
+    return new FutureCallback<>() {
       @Override
       public void onSuccess(@Nullable byte[] responseBytes) {
         if (_queryFuturesById != null) {
@@ -195,7 +195,8 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
         }
         if (responseBytes != null) {
           // responseBytes contains either query results or exception.
-          sendResponse(ctx, queryRequest.getTableNameWithType(), 
queryArrivalTimeMs, responseBytes);
+          sendResponse(ctx, queryRequest.getRequestId(), 
queryRequest.getTableNameWithType(), queryArrivalTimeMs,
+              responseBytes);
         } else {
           // Send exception response.
           sendErrorResponse(ctx, queryRequest.getRequestId(), 
tableNameWithType, queryArrivalTimeMs,
@@ -290,7 +291,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
             "Query execution error on: " + _instanceName + " " + e));
       }
       byte[] serializedDataTable = dataTable.toBytes();
-      sendResponse(ctx, tableNameWithType, queryArrivalTimeMs, 
serializedDataTable);
+      sendResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs, 
serializedDataTable);
     } catch (Exception exception) {
       LOGGER.error("Exception while sending query processing error to 
Broker.", exception);
     } finally {
@@ -305,8 +306,8 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
   /**
    * Send a response (either query results or exception) back to broker as 
response to the query request.
    */
-  private void sendResponse(ChannelHandlerContext ctx, String 
tableNameWithType, long queryArrivalTimeMs,
-      byte[] serializedDataTable) {
+  private void sendResponse(ChannelHandlerContext ctx, long requestId, String 
tableNameWithType,
+      long queryArrivalTimeMs, byte[] serializedDataTable) {
     long sendResponseStartTimeMs = System.currentTimeMillis();
     int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - 
queryArrivalTimeMs);
     
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> 
{
@@ -319,11 +320,12 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
 
       int totalQueryTimeMs = (int) (sendResponseEndTimeMs - 
queryArrivalTimeMs);
       if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-        LOGGER.info("Slow query: request handler processing time: {}, send 
response latency: {}, total time to handle "
-            + "request: {}", queryProcessingTimeMs, sendResponseLatencyMs, 
totalQueryTimeMs);
+        LOGGER.info(
+            "Slow query ({}): request handler processing time: {}, send 
response latency: {}, total time to handle "
+                + "request: {}", requestId, queryProcessingTimeMs, 
sendResponseLatencyMs, totalQueryTimeMs);
       }
       if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
-        LOGGER.warn("Large query: response size in bytes: {}, table name {}",
+        LOGGER.warn("Large query ({}): response size in bytes: {}, table name 
{}", requestId,
             serializedDataTable.length, tableNameWithType);
         ServerMetrics.get().addMeteredTableValue(tableNameWithType, 
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to