siddharthteotia commented on a change in pull request #6596:
URL: https://github.com/apache/incubator-pinot/pull/6596#discussion_r582226453
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,120 @@ public InstanceRequestHandler(QueryScheduler
queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}
+ /**
+ * Always return a response even when query execution throws exception;
otherwise, broker
+ * will keep waiting until timeout.
+ */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- long queryArrivalTimeMs = System.currentTimeMillis();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- int requestSize = msg.readableBytes();
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
- byte[] requestBytes = new byte[requestSize];
- msg.readBytes(requestBytes);
-
- InstanceRequest instanceRequest = new InstanceRequest();
+ long queryArrivalTimeMs = 0;
+ InstanceRequest instanceRequest = null;
+ byte[] requestBytes = null;
+
try {
+ // put all inside try block to catch all exceptions.
+ final int requestSize = msg.readableBytes();
+
Review comment:
This is the style I also followed of using final for local variables for
my first few PRs. Glad to know someone else did it too. However, Pinot coding
style doesn't follow this. So you may want to remove it for now.
But something we can discuss independent of this PR. I think it makes sense
to use final for constant local variables as well and not just instance
variables.
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,120 @@ public InstanceRequestHandler(QueryScheduler
queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}
+ /**
+ * Always return a response even when query execution throws exception;
otherwise, broker
+ * will keep waiting until timeout.
+ */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- long queryArrivalTimeMs = System.currentTimeMillis();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- int requestSize = msg.readableBytes();
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
- byte[] requestBytes = new byte[requestSize];
- msg.readBytes(requestBytes);
-
- InstanceRequest instanceRequest = new InstanceRequest();
+ long queryArrivalTimeMs = 0;
+ InstanceRequest instanceRequest = null;
+ byte[] requestBytes = null;
+
try {
+ // put all inside try block to catch all exceptions.
+ final int requestSize = msg.readableBytes();
+
+ instanceRequest = new InstanceRequest();
+ ServerQueryRequest queryRequest;
+ requestBytes = new byte[requestSize];
+
+ queryArrivalTimeMs = System.currentTimeMillis();
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
+
+ // parse instance request into a query result.
+ msg.readBytes(requestBytes);
Review comment:
Typo? Should be `parse instance request into a query request`
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,120 @@ public InstanceRequestHandler(QueryScheduler
queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}
+ /**
+ * Always return a response even when query execution throws exception;
otherwise, broker
+ * will keep waiting until timeout.
+ */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- long queryArrivalTimeMs = System.currentTimeMillis();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- int requestSize = msg.readableBytes();
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
- byte[] requestBytes = new byte[requestSize];
- msg.readBytes(requestBytes);
-
- InstanceRequest instanceRequest = new InstanceRequest();
+ long queryArrivalTimeMs = 0;
+ InstanceRequest instanceRequest = null;
+ byte[] requestBytes = null;
+
try {
+ // put all inside try block to catch all exceptions.
+ final int requestSize = msg.readableBytes();
+
+ instanceRequest = new InstanceRequest();
+ ServerQueryRequest queryRequest;
+ requestBytes = new byte[requestSize];
+
+ queryArrivalTimeMs = System.currentTimeMillis();
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
+
+ // parse instance request into a query result.
+ msg.readBytes(requestBytes);
_deserializer.deserialize(instanceRequest, requestBytes);
+ queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics,
queryArrivalTimeMs);
+
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
queryArrivalTimeMs)
+ .stopAndRecord();
+
+ // Submit query for execution and register callback for execution
results.
+ Futures.addCallback(_queryScheduler.submit(queryRequest),
+ createCallback(ctx, queryArrivalTimeMs, instanceRequest,
queryRequest), MoreExecutors.directExecutor());
} catch (Exception e) {
- LOGGER
- .error("Caught exception while deserializing the instance request:
{}", BytesUtils.toHexString(requestBytes),
- e);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
1);
- return;
+ String hexString = requestBytes != null ?
BytesUtils.toHexString(requestBytes) : "";
+ long reqestId = instanceRequest != null ? instanceRequest.getRequestId()
: 0;
Review comment:
Isnt't this null check late? I think we should check this after lines 90
and 91 respectively to avoid NPE
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,120 @@ public InstanceRequestHandler(QueryScheduler
queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}
+ /**
+ * Always return a response even when query execution throws exception;
otherwise, broker
+ * will keep waiting until timeout.
+ */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- long queryArrivalTimeMs = System.currentTimeMillis();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- int requestSize = msg.readableBytes();
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
- byte[] requestBytes = new byte[requestSize];
- msg.readBytes(requestBytes);
-
- InstanceRequest instanceRequest = new InstanceRequest();
+ long queryArrivalTimeMs = 0;
+ InstanceRequest instanceRequest = null;
+ byte[] requestBytes = null;
+
try {
+ // put all inside try block to catch all exceptions.
+ final int requestSize = msg.readableBytes();
+
+ instanceRequest = new InstanceRequest();
+ ServerQueryRequest queryRequest;
+ requestBytes = new byte[requestSize];
+
+ queryArrivalTimeMs = System.currentTimeMillis();
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
+
+ // parse instance request into a query result.
+ msg.readBytes(requestBytes);
_deserializer.deserialize(instanceRequest, requestBytes);
+ queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics,
queryArrivalTimeMs);
+
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
queryArrivalTimeMs)
+ .stopAndRecord();
+
+ // Submit query for execution and register callback for execution
results.
+ Futures.addCallback(_queryScheduler.submit(queryRequest),
+ createCallback(ctx, queryArrivalTimeMs, instanceRequest,
queryRequest), MoreExecutors.directExecutor());
} catch (Exception e) {
- LOGGER
- .error("Caught exception while deserializing the instance request:
{}", BytesUtils.toHexString(requestBytes),
- e);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
1);
- return;
+ String hexString = requestBytes != null ?
BytesUtils.toHexString(requestBytes) : "";
+ long reqestId = instanceRequest != null ? instanceRequest.getRequestId()
: 0;
+ LOGGER.error("Exception while deserializing the instance request: {}",
hexString, e);
+ sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new
DataTableImplV2(), e);
Review comment:
Looks like we are no longer emitting the metric
REQUEST_DESERIALIZATION_EXCEPTIONS if we catch exception during deserialization
?
I see that sendErrorResponse doing the following
`_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS,
1);`
I think we should continue to emit the request deserialization exception
metric because I believe the above metric will be emitted even if the
exception didn't happen during request deserialization
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,120 @@ public InstanceRequestHandler(QueryScheduler
queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}
+ /**
+ * Always return a response even when query execution throws exception;
otherwise, broker
+ * will keep waiting until timeout.
+ */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- long queryArrivalTimeMs = System.currentTimeMillis();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- int requestSize = msg.readableBytes();
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
- byte[] requestBytes = new byte[requestSize];
- msg.readBytes(requestBytes);
-
- InstanceRequest instanceRequest = new InstanceRequest();
+ long queryArrivalTimeMs = 0;
+ InstanceRequest instanceRequest = null;
+ byte[] requestBytes = null;
+
try {
+ // put all inside try block to catch all exceptions.
+ final int requestSize = msg.readableBytes();
+
+ instanceRequest = new InstanceRequest();
+ ServerQueryRequest queryRequest;
+ requestBytes = new byte[requestSize];
+
+ queryArrivalTimeMs = System.currentTimeMillis();
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
requestSize);
+
+ // parse instance request into a query result.
+ msg.readBytes(requestBytes);
_deserializer.deserialize(instanceRequest, requestBytes);
+ queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics,
queryArrivalTimeMs);
+
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
queryArrivalTimeMs)
+ .stopAndRecord();
+
+ // Submit query for execution and register callback for execution
results.
+ Futures.addCallback(_queryScheduler.submit(queryRequest),
+ createCallback(ctx, queryArrivalTimeMs, instanceRequest,
queryRequest), MoreExecutors.directExecutor());
} catch (Exception e) {
- LOGGER
- .error("Caught exception while deserializing the instance request:
{}", BytesUtils.toHexString(requestBytes),
- e);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
1);
- return;
+ String hexString = requestBytes != null ?
BytesUtils.toHexString(requestBytes) : "";
+ long reqestId = instanceRequest != null ? instanceRequest.getRequestId()
: 0;
+ LOGGER.error("Exception while deserializing the instance request: {}",
hexString, e);
+ sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new
DataTableImplV2(), e);
}
+ }
- ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest,
_serverMetrics, queryArrivalTimeMs);
-
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
queryArrivalTimeMs)
- .stopAndRecord();
-
- // NOTE: executor must be provided as addCallback(future, callback) is
removed from newer guava version
- Futures.addCallback(_queryScheduler.submit(queryRequest), new
FutureCallback<byte[]>() {
+ private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx,
long queryArrivalTimeMs,
+ InstanceRequest instanceRequest, ServerQueryRequest queryRequest) {
+ return new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] responseBytes) {
- // NOTE: response bytes can be null if data table serialization throws
exception
if (responseBytes != null) {
- long sendResponseStartTimeMs = System.currentTimeMillis();
- int queryProcessingTimeMs = (int) (sendResponseStartTimeMs -
queryArrivalTimeMs);
-
ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
- long sendResponseEndTimeMs = System.currentTimeMillis();
- int sendResponseLatencyMs = (int) (sendResponseEndTimeMs -
sendResponseStartTimeMs);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
1);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT,
responseBytes.length);
-
_serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
sendResponseLatencyMs,
- TimeUnit.MILLISECONDS);
-
- int totalQueryTimeMs = (int) (sendResponseEndTimeMs -
queryArrivalTimeMs);
- if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
- LOGGER.info(
- "Slow query: request handler processing time: {}, send
response latency: {}, total time to handle request: {}",
- queryProcessingTimeMs, sendResponseLatencyMs,
totalQueryTimeMs);
- }
- });
+ // responseBytes contains either query results or exception.
+ sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+ } else {
+ // Send exception response.
+ sendErrorResponse(ctx, queryRequest.getRequestId(),
queryArrivalTimeMs, new DataTableImplV2(),
+ new Exception("Null query response."));
}
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Caught exception while processing instance request", t);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS,
1);
+ // Send exception response.
Review comment:
Can you please also update the PR description to call out the metrics
that will no longer be emitted after this PR. Looks like UNCAUGHT_EXCEPTIONS
and REQUEST_FETCH_EXCEPTION are the ones
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]