amrishlal commented on a change in pull request #6596:
URL: https://github.com/apache/incubator-pinot/pull/6596#discussion_r582365294



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,120 @@ public InstanceRequestHandler(QueryScheduler 
queryScheduler, ServerMetrics serve
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   * Always return a response even when query execution throws exception; 
otherwise, broker
+   * will keep waiting until timeout.
+   */
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
-    int requestSize = msg.readableBytes();
-    
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
 requestSize);
-    byte[] requestBytes = new byte[requestSize];
-    msg.readBytes(requestBytes);
-
-    InstanceRequest instanceRequest = new InstanceRequest();
+    long queryArrivalTimeMs = 0;
+    InstanceRequest instanceRequest = null;
+    byte[] requestBytes = null;
+
     try {
+      // put all inside try block to catch all exceptions.
+      final int requestSize = msg.readableBytes();
+
+      instanceRequest = new InstanceRequest();
+      ServerQueryRequest queryRequest;
+      requestBytes = new byte[requestSize];
+
+      queryArrivalTimeMs = System.currentTimeMillis();
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+      
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
 requestSize);
+
+      // parse instance request into a query result.
+      msg.readBytes(requestBytes);
       _deserializer.deserialize(instanceRequest, requestBytes);
+      queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, 
queryArrivalTimeMs);
+      
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
 queryArrivalTimeMs)
+          .stopAndRecord();
+
+      // Submit query for execution and register callback for execution 
results.
+      Futures.addCallback(_queryScheduler.submit(queryRequest),
+          createCallback(ctx, queryArrivalTimeMs, instanceRequest, 
queryRequest), MoreExecutors.directExecutor());
     } catch (Exception e) {
-      LOGGER
-          .error("Caught exception while deserializing the instance request: 
{}", BytesUtils.toHexString(requestBytes),
-              e);
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
 1);
-      return;
+      String hexString = requestBytes != null ? 
BytesUtils.toHexString(requestBytes) : "";
+      long reqestId = instanceRequest != null ? instanceRequest.getRequestId() 
: 0;
+      LOGGER.error("Exception while deserializing the instance request: {}", 
hexString, e);
+      sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new 
DataTableImplV2(), e);
     }
+  }
 
-    ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, 
_serverMetrics, queryArrivalTimeMs);
-    
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
 queryArrivalTimeMs)
-        .stopAndRecord();
-
-    // NOTE: executor must be provided as addCallback(future, callback) is 
removed from newer guava version
-    Futures.addCallback(_queryScheduler.submit(queryRequest), new 
FutureCallback<byte[]>() {
+  private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, 
long queryArrivalTimeMs,
+      InstanceRequest instanceRequest, ServerQueryRequest queryRequest) {
+    return new FutureCallback<byte[]>() {
       @Override
       public void onSuccess(@Nullable byte[] responseBytes) {
-        // NOTE: response bytes can be null if data table serialization throws 
exception
         if (responseBytes != null) {
-          long sendResponseStartTimeMs = System.currentTimeMillis();
-          int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - 
queryArrivalTimeMs);
-          
ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
-            long sendResponseEndTimeMs = System.currentTimeMillis();
-            int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - 
sendResponseStartTimeMs);
-            
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
 1);
-            
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, 
responseBytes.length);
-            
_serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
 sendResponseLatencyMs,
-                TimeUnit.MILLISECONDS);
-
-            int totalQueryTimeMs = (int) (sendResponseEndTimeMs - 
queryArrivalTimeMs);
-            if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-              LOGGER.info(
-                  "Slow query: request handler processing time: {}, send 
response latency: {}, total time to handle request: {}",
-                  queryProcessingTimeMs, sendResponseLatencyMs, 
totalQueryTimeMs);
-            }
-          });
+          // responseBytes contains either query results or exception.
+          sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+        } else {
+          // Send exception response.
+          sendErrorResponse(ctx, queryRequest.getRequestId(), 
queryArrivalTimeMs, new DataTableImplV2(),
+              new Exception("Null query response."));
         }
       }
 
       @Override
       public void onFailure(Throwable t) {
-        LOGGER.error("Caught exception while processing instance request", t);
-        _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 
1);
+        // Send exception response.
+        LOGGER.error("Exception while processing instance request", t);
+        sendErrorResponse(ctx, instanceRequest.getRequestId(), 
queryArrivalTimeMs, new DataTableImplV2(), new Exception(t));
       }
-    }, MoreExecutors.directExecutor());
+    };
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOGGER.error("Caught exception while fetching instance request", cause);
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 
1);
+    // All exceptions should be caught and handled in channelRead0 method. 
This is a fallback method that
+    // will only be called if for some remote reason we are unable to handle 
exceptions in channelRead0.
+    String message = "Unhandled Exception in " + getClass().getCanonicalName();
+    LOGGER.error(message, cause);
+    sendErrorResponse(ctx, 0, System.currentTimeMillis(), new 
DataTableImplV2(), new Exception(message, cause));
+  }
+
+  /**
+   * Send an exception back to broker as response to the query request.
+   */
+  private void sendErrorResponse(ChannelHandlerContext ctx, long requestId, 
long queryArrivalTimeMs, DataTable dataTable,
+      Exception e) {
+    try {
+      Map<String, String> dataTableMetadata = dataTable.getMetadata();
+      dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, 
Long.toString(requestId));
+
+      
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to