This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 89b0d01b2c Add query request ID to log lines for slow / large queries
(#14627)
89b0d01b2c is described below
commit 89b0d01b2c8db7bb71780d178be762edf392bd0a
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Dec 13 12:09:30 2024 +0700
Add query request ID to log lines for slow / large queries (#14627)
---
.../pinot/core/transport/InstanceRequestHandler.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 4940823b74..8f63fd4f69 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -183,7 +183,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx,
String tableNameWithType,
long queryArrivalTimeMs, InstanceRequest instanceRequest,
ServerQueryRequest queryRequest) {
- return new FutureCallback<byte[]>() {
+ return new FutureCallback<>() {
@Override
public void onSuccess(@Nullable byte[] responseBytes) {
if (_queryFuturesById != null) {
@@ -195,7 +195,8 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
}
if (responseBytes != null) {
// responseBytes contains either query results or exception.
- sendResponse(ctx, queryRequest.getTableNameWithType(),
queryArrivalTimeMs, responseBytes);
+ sendResponse(ctx, queryRequest.getRequestId(),
queryRequest.getTableNameWithType(), queryArrivalTimeMs,
+ responseBytes);
} else {
// Send exception response.
sendErrorResponse(ctx, queryRequest.getRequestId(),
tableNameWithType, queryArrivalTimeMs,
@@ -290,7 +291,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
"Query execution error on: " + _instanceName + " " + e));
}
byte[] serializedDataTable = dataTable.toBytes();
- sendResponse(ctx, tableNameWithType, queryArrivalTimeMs,
serializedDataTable);
+ sendResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs,
serializedDataTable);
} catch (Exception exception) {
LOGGER.error("Exception while sending query processing error to
Broker.", exception);
} finally {
@@ -305,8 +306,8 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
/**
* Send a response (either query results or exception) back to broker as
response to the query request.
*/
- private void sendResponse(ChannelHandlerContext ctx, String
tableNameWithType, long queryArrivalTimeMs,
- byte[] serializedDataTable) {
+ private void sendResponse(ChannelHandlerContext ctx, long requestId, String
tableNameWithType,
+ long queryArrivalTimeMs, byte[] serializedDataTable) {
long sendResponseStartTimeMs = System.currentTimeMillis();
int queryProcessingTimeMs = (int) (sendResponseStartTimeMs -
queryArrivalTimeMs);
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f ->
{
@@ -319,11 +320,12 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
int totalQueryTimeMs = (int) (sendResponseEndTimeMs -
queryArrivalTimeMs);
if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
- LOGGER.info("Slow query: request handler processing time: {}, send
response latency: {}, total time to handle "
- + "request: {}", queryProcessingTimeMs, sendResponseLatencyMs,
totalQueryTimeMs);
+ LOGGER.info(
+ "Slow query ({}): request handler processing time: {}, send
response latency: {}, total time to handle "
+ + "request: {}", requestId, queryProcessingTimeMs,
sendResponseLatencyMs, totalQueryTimeMs);
}
if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
- LOGGER.warn("Large query: response size in bytes: {}, table name {}",
+ LOGGER.warn("Large query ({}): response size in bytes: {}, table name
{}", requestId,
serializedDataTable.length, tableNameWithType);
ServerMetrics.get().addMeteredTableValue(tableNameWithType,
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]