Jackie-Jiang commented on code in PR #9171:
URL: https://github.com/apache/pinot/pull/9171#discussion_r944896070
##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -141,6 +152,49 @@ public void processSqlQueryPost(String query, @Suspended
AsyncResponse asyncResp
}
}
+ @DELETE
+ @Path("query/{queryId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Cancel a query as identified by the queryId", notes =
"No effect if no query exists for the "
+ + "given queryId on the requested broker. Query may continue to run for
a short while after calling cancel as "
+ + "it's done in a non-blocking manner. The cancel method can be called
multiple times.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error"),
+ @ApiResponse(code = 404, message = "Query not found on the requested
broker")
+ })
+ public String cancelQuery(
+ @ApiParam(value = "QueryId as assigned by the broker", required = true)
@PathParam("queryId") long queryId,
+ @ApiParam(value = "Timeout for servers to respond the cancel request")
@QueryParam("timeoutMs")
+ @DefaultValue("3000") int timeoutMs,
+ @ApiParam(value = "Return server responses for troubleshooting")
@QueryParam("detailed") @DefaultValue("false")
Review Comment:
(minor) `verbose`
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
}
}
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ return
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> e.getValue()._query));
+ }
+
+ @VisibleForTesting
+ Set<ServerInstance> getRunningServers(long queryId) {
+ if (_queriesById.isEmpty()) {
+ return Collections.emptySet();
+ }
+ return _queriesById.get(queryId)._servers;
Review Comment:
Can this throw NPE?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -190,7 +259,17 @@ public BrokerResponseNative handleRequest(JsonNode
request, @Nullable RequesterI
if (sql == null) {
throw new BadQueryRequestException("Failed to find 'sql' in the request:
" + request);
}
- return handleRequest(requestId, sql.asText(), request, requesterIdentity,
requestContext);
+ if (!_enableQueryCancellation) {
+ return handleRequest(requestId, sql.asText(), request,
requesterIdentity, requestContext);
+ }
+ _queriesById.put(requestId, new QueryRoutingTable(sql.asText()));
Review Comment:
(minor) We might want to pass in `QueryRoutingTable` in `handleRequest()` to
avoid extra lookup. It can also avoid NPE if somehow the request is early
removed (probably not possible right now)
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -125,6 +134,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
private final int _defaultHllLog2m;
private final boolean _enableQueryLimitOverride;
private final boolean _enableDistinctCountBitmapOverride;
+ private final Map<Long, QueryRoutingTable> _queriesById = new
ConcurrentHashMap<>();
Review Comment:
(minor) set it to `null` when query cancellation is disabled
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java:
##########
@@ -36,4 +39,19 @@ public interface BrokerRequestHandler {
BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity
requesterIdentity,
RequestContext requestContext)
throws Exception;
+
+ Map<Long, String> getRunningQueries();
+
+ /**
+ * Cancel a query as identified by the queryId. This method is non-blocking
so the query may still run for a while
+ * after calling this method. This cancel method can be called multiple
times.
+ * @param queryId the unique Id assigned to the query by the broker
+ * @param timeoutMs timeout to wait for servers to respond the cancel
requests
+ * @param executor to send cancel requests to servers in parallel
+ * @param connMgr to provide the http connections
+ * @param serverResponses to collect cancel responses from all servers if a
map is provided
+ * @return true if there is a running query for the given queryId.
+ */
+ boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
Review Comment:
Should it throw `TimeoutException` (and maybe `InterruptedException`) when
the server didn't return within the timeout?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
}
}
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ return
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> e.getValue()._query));
Review Comment:
Check and throw exception when query cancellation is not enabled (with
proper exception message). Same for other APIs that require enabling query
cancellation. We don't want to return empty map because that can confuse the
user.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
}
}
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ return
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> e.getValue()._query));
+ }
+
+ @VisibleForTesting
+ Set<ServerInstance> getRunningServers(long queryId) {
+ if (_queriesById.isEmpty()) {
+ return Collections.emptySet();
+ }
+ return _queriesById.get(queryId)._servers;
+ }
+
+ @Override
+ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
+ Map<String, Integer> serverResponses) {
+ QueryRoutingTable routingTable = _queriesById.get(queryId);
+ if (routingTable == null) {
+ return false;
+ }
+ String globalId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>();
+ for (ServerInstance server : routingTable._servers) {
+ serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(),
globalId));
+ }
+ if (serverUrls.isEmpty()) {
+ LOGGER.debug("No servers running the query: {} right now", globalId);
+ return true;
+ }
+ LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId,
serverUrls);
+ CompletionService<DeleteMethod> completionService =
Review Comment:
There is a corner case not handled:
Broker added the servers to the `QueryRoutingTable`, but not routed the
query yet. Now the query cancellation might arrive earlier than the query. To
the broker, it cannot differentiate this scenario vs query already completed.
I haven't thought through whether we need to handle it, but this is a thing
worth notice.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
}
}
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ return
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> e.getValue()._query));
+ }
+
+ @VisibleForTesting
+ Set<ServerInstance> getRunningServers(long queryId) {
+ if (_queriesById.isEmpty()) {
+ return Collections.emptySet();
+ }
+ return _queriesById.get(queryId)._servers;
+ }
+
+ @Override
+ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
+ Map<String, Integer> serverResponses) {
+ QueryRoutingTable routingTable = _queriesById.get(queryId);
+ if (routingTable == null) {
+ return false;
+ }
+ String globalId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>();
+ for (ServerInstance server : routingTable._servers) {
+ serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(),
globalId));
+ }
+ if (serverUrls.isEmpty()) {
+ LOGGER.debug("No servers running the query: {} right now", globalId);
+ return true;
Review Comment:
This usually means the routing table is not calculated. We should put a flag
in the `QueryRoutingTable` (consider renaming it because it is not really
routing table) to indicate whether the query is cancelled. We can check that
before sending the query out.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
}
}
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ return
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> e.getValue()._query));
+ }
+
+ @VisibleForTesting
+ Set<ServerInstance> getRunningServers(long queryId) {
+ if (_queriesById.isEmpty()) {
+ return Collections.emptySet();
+ }
+ return _queriesById.get(queryId)._servers;
+ }
+
+ @Override
+ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
+ Map<String, Integer> serverResponses) {
+ QueryRoutingTable routingTable = _queriesById.get(queryId);
+ if (routingTable == null) {
+ return false;
+ }
+ String globalId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>();
+ for (ServerInstance server : routingTable._servers) {
+ serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(),
globalId));
+ }
+ if (serverUrls.isEmpty()) {
+ LOGGER.debug("No servers running the query: {} right now", globalId);
+ return true;
+ }
+ LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId,
serverUrls);
+ CompletionService<DeleteMethod> completionService =
+ new MultiHttpRequest(executor, connMgr).execute(serverUrls, null,
timeoutMs, "DELETE", DeleteMethod::new);
+ for (int i = 0; i < serverUrls.size(); i++) {
+ DeleteMethod deleteMethod = null;
+ try {
+ // Wait for all requests to respond before returning to be sure that
the servers have handled the cancel
+ // requests. The completion order is different from serverUrls, thus
use uri in the response.
+ deleteMethod = completionService.take().get();
+ URI uri = deleteMethod.getURI();
+ LOGGER.debug("Got response: {} to cancel query: {} via uri: {}",
deleteMethod.getStatusCode(), globalId, uri);
+ if (serverResponses != null) {
+ serverResponses.put(uri.getHost(), deleteMethod.getStatusCode());
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to cancel query: {}", globalId, e);
Review Comment:
This needs to be reflected back to the user. Logging an error is not enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]