Jackie-Jiang commented on code in PR #11173:
URL: https://github.com/apache/pinot/pull/11173#discussion_r1274123322


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java:
##########
@@ -97,23 +100,45 @@ public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryRespo
     Map<String, String> requestMetadataMap;
     requestMetadataMap = request.getMetadataMap();
     long requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+    long timeoutMs = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
     try {
       distributedStagePlans = 
QueryPlanSerDeUtils.deserializeStagePlan(request);
     } catch (Exception e) {
       LOGGER.error("Caught exception while deserializing the request: {}", 
requestId, e);
       responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad 
request").withCause(e).asException());
       return;
     }
-    // TODO: allow thrown exception to return back to broker in asynchronous 
manner.
+    BlockingQueue<String> stageSubmissionCallbacks = new 
LinkedBlockingQueue<>();

Review Comment:
   We should track all the `Future`s and cancel them when query runs into 
problem. With `Future`, we can easily get whether it is done or throws 
exception. No need to use a separate blocking queue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to