xiangfu0 commented on code in PR #18649:
URL: https://github.com/apache/pinot/pull/18649#discussion_r3341082507
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -338,6 +340,65 @@ protected boolean handleCancel(long queryId, int
timeoutMs, Executor executor, H
return true;
}
+ @Override
+ @Nullable
+ public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs,
Executor executor,
+ HttpClientConnectionManager connMgr)
+ throws Exception {
+ Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation
is not enabled on broker");
+ QueryServers queryServers = _serversById.get(queryId);
+ if (queryServers == null) {
+ return null;
+ }
+
+ // TODO: Use different global query id for OFFLINE and REALTIME table
after releasing 0.12.0. See QueryIdUtils for
+ // details.
+ String globalQueryId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
+ for (ServerInstance serverInstance : queryServers._servers) {
+ serverUrls.add(String.format("%s/query/%s/progress",
serverInstance.getAdminEndpoint(), globalQueryId));
+ }
+ if (serverUrls.isEmpty()) {
+ return null;
+ }
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null,
timeoutMs);
+ List<QueryProgressStats> serverProgressStats = new
ArrayList<>(serverUrls.size());
+ List<String> errMsgs = new ArrayList<>(serverUrls.size());
+ for (int i = 0; i < serverUrls.size(); i++) {
+ MultiHttpRequestResponse httpRequestResponse = null;
+ URI uri = null;
+ try {
+ httpRequestResponse = completionService.take().get();
+ uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ if (status == 200) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ serverProgressStats.add(JsonUtils.stringToObject(responseString,
QueryProgressStats.class));
+ } else if (status != 404) {
Review Comment:
Ignoring 404 here will make the aggregated progress go backwards once faster
SSE servers finish. `InstanceRequestHandler` removes the server-side
`QueryExecutionContext` on success, so completed servers immediately stop
returning 200 even though they already contributed all of their processed/total
work. Aggregating only the remaining 200 responses can turn a nearly-finished
query into a much smaller numerator and denominator. This needs to retain
finished-server progress, or treat 404 as completion for that server, instead
of dropping it from the aggregate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]