amrishlal commented on a change in pull request #6596:
URL: https://github.com/apache/incubator-pinot/pull/6596#discussion_r580737043
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,107 @@ public InstanceRequestHandler(QueryScheduler
queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}
+ /**
+ * Always return a response even when query execution throws exception;
otherwise, broker
+ * will keep waiting until timeout.
+ */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- long queryArrivalTimeMs = System.currentTimeMillis();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- int requestSize = msg.readableBytes();
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
- byte[] requestBytes = new byte[requestSize];
- msg.readBytes(requestBytes);
+ final long queryArrivalTimeMs = System.currentTimeMillis();
+ final int requestSize = msg.readableBytes();
InstanceRequest instanceRequest = new InstanceRequest();
+ ServerQueryRequest queryRequest;
+ byte[] requestBytes = new byte[requestSize];
+
try {
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
+
+ // parse instance request into a query result.
+ msg.readBytes(requestBytes);
_deserializer.deserialize(instanceRequest, requestBytes);
+ queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics,
queryArrivalTimeMs);
+
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
queryArrivalTimeMs)
+ .stopAndRecord();
} catch (Exception e) {
- LOGGER
- .error("Caught exception while deserializing the instance request:
{}", BytesUtils.toHexString(requestBytes),
- e);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
1);
+ LOGGER.error("Exception while deserializing the instance request: {}",
BytesUtils.toHexString(requestBytes), e);
+ sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs,
new DataTableImplV2(), e);
return;
}
- ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest,
_serverMetrics, queryArrivalTimeMs);
-
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
queryArrivalTimeMs)
- .stopAndRecord();
-
- // NOTE: executor must be provided as addCallback(future, callback) is
removed from newer guava version
+ // Submit query for execution and register callback for execution results.
Futures.addCallback(_queryScheduler.submit(queryRequest), new
FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] responseBytes) {
- // NOTE: response bytes can be null if data table serialization throws
exception
if (responseBytes != null) {
- long sendResponseStartTimeMs = System.currentTimeMillis();
- int queryProcessingTimeMs = (int) (sendResponseStartTimeMs -
queryArrivalTimeMs);
-
ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
- long sendResponseEndTimeMs = System.currentTimeMillis();
- int sendResponseLatencyMs = (int) (sendResponseEndTimeMs -
sendResponseStartTimeMs);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
1);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT,
responseBytes.length);
-
_serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
sendResponseLatencyMs,
- TimeUnit.MILLISECONDS);
-
- int totalQueryTimeMs = (int) (sendResponseEndTimeMs -
queryArrivalTimeMs);
- if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
- LOGGER.info(
- "Slow query: request handler processing time: {}, send
response latency: {}, total time to handle request: {}",
- queryProcessingTimeMs, sendResponseLatencyMs,
totalQueryTimeMs);
- }
- });
+ // responseBytes contains either query results or exception.
+ sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+ } else {
+ // Send exception response.
+ sendResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs,
new DataTableImplV2(),
+ new Exception("Null query response."));
}
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Caught exception while processing instance request", t);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS,
1);
+ // Send exception response.
+ LOGGER.error("Exception while processing instance request", t);
+ sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs,
new DataTableImplV2(), new Exception(t));
}
}, MoreExecutors.directExecutor());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- LOGGER.error("Caught exception while fetching instance request", cause);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS,
1);
+ // Since we do not know the requestId of the original request here, there
is no way for Broker to know which query
+ // request this response belongs to. Hence, Broker will continue to wait
for the original request until time out.
+ // To prevent broker from waiting unncessarily,try to catch and handle all
exceptions in channelRead0 method so that
+ // this function is never called.
+ LOGGER.error("Unhandled Exception in " + getClass().getCanonicalName(),
cause);
+ sendResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(),
new Exception(cause));
+ }
+
+ /**
+ * Send an exception back to broker as response to the query request.
+ */
+ private void sendResponse(ChannelHandlerContext ctx, long requestId, long
queryArrivalTimeMs, DataTable dataTable,
Review comment:
The `onSuccess` method in the callback gets byte data so to send that
byte data back to server we need a `sendResponse` function that takes byte data
as input. Then there are cases where we detect error conditions and need to
create our own byte representation of the error condition. This is where the
first `sendResponse` method is called. The first `sendResponse` method converts
DataTable into byte data and calls the second more generic `sendResponse`
method. I can rename the first `sendResponse` method to `sendErrorResponse` if
that clarifies things better.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]