amrishlal commented on a change in pull request #6596:
URL: https://github.com/apache/incubator-pinot/pull/6596#discussion_r582395572



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +65,126 @@ public InstanceRequestHandler(QueryScheduler 
queryScheduler, ServerMetrics serve
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   * Always return a response even when query execution throws exception; 
otherwise, broker
+   * will keep waiting until timeout.
+   */
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
-    int requestSize = msg.readableBytes();
-    
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
 requestSize);
-    byte[] requestBytes = new byte[requestSize];
-    msg.readBytes(requestBytes);
-
-    InstanceRequest instanceRequest = new InstanceRequest();
+    long queryArrivalTimeMs = 0;
+    InstanceRequest instanceRequest = null;
+    byte[] requestBytes = null;
+
     try {
+      // put all inside try block to catch all exceptions.
+      int requestSize = msg.readableBytes();
+
+      instanceRequest = new InstanceRequest();
+      ServerQueryRequest queryRequest;
+      requestBytes = new byte[requestSize];
+
+      queryArrivalTimeMs = System.currentTimeMillis();
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+      
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
 requestSize);
+
+      // parse instance request into ServerQueryRequest.
+      msg.readBytes(requestBytes);
       _deserializer.deserialize(instanceRequest, requestBytes);
+      queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, 
queryArrivalTimeMs);
+      
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION,
 queryArrivalTimeMs)
+          .stopAndRecord();
+
+      // Submit query for execution and register callback for execution 
results.
+      Futures.addCallback(_queryScheduler.submit(queryRequest),
+          createCallback(ctx, queryArrivalTimeMs, instanceRequest, 
queryRequest), MoreExecutors.directExecutor());
     } catch (Exception e) {
-      LOGGER
-          .error("Caught exception while deserializing the instance request: 
{}", BytesUtils.toHexString(requestBytes),
-              e);
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
 1);
-      return;
-    }
+      if (e instanceof TException) {
+        // deserialization exception

Review comment:
       Yes, `_deserializer.deserialize(instanceRequest, requestBytes)` is the 
only method that throws `TException`. Also, any exceptions thrown within:
   ```
         Futures.addCallback(_queryScheduler.submit(queryRequest),
             createCallback(ctx, queryArrivalTimeMs, instanceRequest, 
queryRequest), MoreExecutors.directExecutor());
   ```
   
   will get handled by the callback's `onFailure` method.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to