xiangfu0 commented on code in PR #18649:
URL: https://github.com/apache/pinot/pull/18649#discussion_r3365497895
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -338,6 +340,65 @@ protected boolean handleCancel(long queryId, int
timeoutMs, Executor executor, H
return true;
}
+ @Override
+ @Nullable
+ public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs,
Executor executor,
+ HttpClientConnectionManager connMgr)
+ throws Exception {
+ Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation
is not enabled on broker");
+ QueryServers queryServers = _serversById.get(queryId);
+ if (queryServers == null) {
+ return null;
+ }
+
+ // TODO: Use different global query id for OFFLINE and REALTIME table
after releasing 0.12.0. See QueryIdUtils for
+ // details.
+ String globalQueryId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
+ for (ServerInstance serverInstance : queryServers._servers) {
+ serverUrls.add(String.format("%s/query/%s/progress",
serverInstance.getAdminEndpoint(), globalQueryId));
+ }
+ if (serverUrls.isEmpty()) {
+ return null;
+ }
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null,
timeoutMs);
+ List<QueryProgressStats> serverProgressStats = new
ArrayList<>(serverUrls.size());
+ List<String> errMsgs = new ArrayList<>(serverUrls.size());
+ for (int i = 0; i < serverUrls.size(); i++) {
+ MultiHttpRequestResponse httpRequestResponse = null;
+ URI uri = null;
+ try {
+ httpRequestResponse = completionService.take().get();
+ uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ if (status == 200) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ serverProgressStats.add(JsonUtils.stringToObject(responseString,
QueryProgressStats.class));
+ } else if (status != 404) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ throw new Exception(
+ String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Failed to get progress for query id: {} from uri: {}",
queryId, uri, e);
+ errMsgs.add(e.getMessage());
+ } finally {
+ if (httpRequestResponse != null) {
+ httpRequestResponse.close();
+ }
+ }
+ }
+ if (!serverProgressStats.isEmpty()) {
+ return QueryProgressStats.aggregate(serverProgressStats);
Review Comment:
Addressed in the latest push. SSE still returns the compact one-line
progress shape, but the broker no longer aggregates over only the 200
responses. If any targeted SSE server is missing while at least one server
reports progress, the broker adds an unknown placeholder before aggregating, so
total work becomes unknown instead of shrinking the denominator or falsely
reporting 100%. Added a regression test for the partial-server response case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]