npawar commented on code in PR #9171:
URL: https://github.com/apache/pinot/pull/9171#discussion_r941884289


##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -103,7 +115,68 @@ public QueryScheduler(PinotConfiguration config, 
QueryExecutor queryExecutor, Re
    * @return Listenable future for query result representing serialized 
response. It is possible that the
    *    future may return immediately or be scheduled for execution at a later 
time.
    */
-  public abstract ListenableFuture<byte[]> submit(ServerQueryRequest 
queryRequest);
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    ListenableFuture<byte[]> future = submitInternal(queryRequest);
+    if (_enableQueryCancellation) {
+      String queryId = queryRequest.getQueryId();
+      // Track the running query for cancellation.
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Keep track of running query: {}", queryId);
+      }
+      _queryFuturesById.put(queryId, future);
+      // And remove the track when the query ends.
+      Futures.addCallback(future, new FutureCallback<byte[]>() {
+        @Override
+        public void onSuccess(@Nullable byte[] ignored) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Remove track of running query: {} on success", 
queryId);
+          }
+          _queryFuturesById.remove(queryId);
+        }
+
+        @Override
+        public void onFailure(Throwable ignored) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Remove track of running query: {} on failure", 
queryId);
+          }
+          _queryFuturesById.remove(queryId);
+        }
+      }, MoreExecutors.directExecutor());
+    }
+    return future;
+  }
+
+  protected abstract ListenableFuture<byte[]> 
submitInternal(ServerQueryRequest queryRequest);
+
+  /**
+   * Cancel a query as identified by the queryId. This method is non-blocking 
and the query may still run for a while
+   * after calling this method. This method can be called multiple times.
+   *
+   * @param queryId a unique Id to find the query
+   * @return true if a running query exists for the given queryId.
+   */
+  public boolean cancelQuery(String queryId) {
+    // Keep the future as it'll be cleaned up by the thread executing the 
query.
+    Future<byte[]> future = _queryFuturesById.get(queryId);
+    if (future == null) {
+      return false;
+    }
+    boolean done = future.isDone();
+    if (!done) {
+      future.cancel(true);

Review Comment:
   In a query that is long running, this cancel may not have much effect? The 
check for isInterrupted only happens in the base class when we go to a new 
operator. So if we're already past that check, and doing a long running op, 
we'll still have to wait?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -141,6 +152,49 @@ public void processSqlQueryPost(String query, @Suspended 
AsyncResponse asyncResp
     }
   }
 
+  @DELETE
+  @Path("query/{queryId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Cancel a query as identified by the queryId", notes = 
"No effect if no query exists for the "
+      + "given queryId on the requested broker. Query may continue to run for 
a short while after calling cancel as "
+      + "it's done in a non-blocking manner. The cancel method can be called 
multiple times.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Query not found on the requested 
broker")
+  })
+  public String cancelQuery(

Review Comment:
   just wondering, why you removed the brokerId_ prefix from the id here? Once 
we make this a controller API, we'll again have to append brokerId to ensure 
unique ids, and the request from user will also have to include that..



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -141,6 +152,50 @@ public void processSqlQueryPost(String query, @Suspended 
AsyncResponse asyncResp
     }
   }
 
+  @DELETE
+  @Path("query/{queryId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Cancel a query as identified by the queryId", notes = 
"No effect if no query exists for the "
+      + "given queryId on the requested broker. Query may continue to run for 
a short while after calling cancel as "
+      + "it's done in a non-blocking manner. The cancel method can be called 
multiple times.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Query not found on the requested 
broker")
+  })
+  public String cancelQuery(
+      @ApiParam(value = "QueryId as in the format of <brokerId>_<requestId>", 
required = true) @PathParam("queryId")
+          String queryId,
+      @ApiParam(value = "Timeout for servers to respond the cancel request") 
@QueryParam("timeoutMs")
+      @DefaultValue("3000") int timeoutMs,
+      @ApiParam(value = "Return server responses for troubleshooting") 
@QueryParam("detailed") @DefaultValue("false")
+          boolean detailed) {
+    Map<String, Integer> serverResponses = null;
+    if (detailed) {
+      serverResponses = new HashMap<>();
+    }
+    if (!_requestHandler.cancelQuery(queryId, timeoutMs, _executor, 
_httpConnMgr, serverResponses)) {
+      throw new WebApplicationException(
+          Response.status(Response.Status.NOT_FOUND).entity("Query: " + 
queryId + " not found on the broker").build());
+    }
+    String resp = "Cancelled query: " + queryId;
+    if (detailed) {
+      resp += " with responses from servers: " + serverResponses;
+    }
+    return resp;
+  }
+
+  @GET

Review Comment:
   nit: suggest keeping the prefix of the 2 APIs the same (GET /queries and 
DELETE /queries/queryId or GET /query and DELETE /query/queryId)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to