xiangfu0 commented on code in PR #18649:
URL: https://github.com/apache/pinot/pull/18649#discussion_r3362541455
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -338,6 +340,65 @@ protected boolean handleCancel(long queryId, int
timeoutMs, Executor executor, H
return true;
}
+ @Override
+ @Nullable
+ public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs,
Executor executor,
+ HttpClientConnectionManager connMgr)
+ throws Exception {
+ Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation
is not enabled on broker");
+ QueryServers queryServers = _serversById.get(queryId);
+ if (queryServers == null) {
+ return null;
+ }
+
+ // TODO: Use different global query id for OFFLINE and REALTIME table
after releasing 0.12.0. See QueryIdUtils for
+ // details.
+ String globalQueryId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
+ for (ServerInstance serverInstance : queryServers._servers) {
+ serverUrls.add(String.format("%s/query/%s/progress",
serverInstance.getAdminEndpoint(), globalQueryId));
+ }
+ if (serverUrls.isEmpty()) {
+ return null;
+ }
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null,
timeoutMs);
+ List<QueryProgressStats> serverProgressStats = new
ArrayList<>(serverUrls.size());
+ List<String> errMsgs = new ArrayList<>(serverUrls.size());
+ for (int i = 0; i < serverUrls.size(); i++) {
+ MultiHttpRequestResponse httpRequestResponse = null;
+ URI uri = null;
+ try {
+ httpRequestResponse = completionService.take().get();
+ uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ if (status == 200) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ serverProgressStats.add(JsonUtils.stringToObject(responseString,
QueryProgressStats.class));
+ } else if (status != 404) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ throw new Exception(
+ String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Failed to get progress for query id: {} from uri: {}",
queryId, uri, e);
+ errMsgs.add(e.getMessage());
+ } finally {
+ if (httpRequestResponse != null) {
+ httpRequestResponse.close();
+ }
+ }
+ }
+ if (!serverProgressStats.isEmpty()) {
+ return QueryProgressStats.aggregate(serverProgressStats);
Review Comment:
This still returns a partial aggregate over only the SSE servers that
replied with 200. If one targeted server times out or is temporarily
unreachable, its unfinished work disappears from the denominator and the broker
can report inflated progress, including 100%, even though the query is still
blocked on that server. The MSE path now avoids that by treating missing
servers as unknown progress; SSE needs the same treatment here (or retained
last-known totals) instead of returning a partial aggregate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]