xiangfu0 commented on code in PR #18649:
URL: https://github.com/apache/pinot/pull/18649#discussion_r3362541455


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -338,6 +340,65 @@ protected boolean handleCancel(long queryId, int 
timeoutMs, Executor executor, H
     return true;
   }
 
+  @Override
+  @Nullable
+  public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs, 
Executor executor,
+      HttpClientConnectionManager connMgr)
+      throws Exception {
+    Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation 
is not enabled on broker");
+    QueryServers queryServers = _serversById.get(queryId);
+    if (queryServers == null) {
+      return null;
+    }
+
+    // TODO: Use different global query id for OFFLINE and REALTIME table 
after releasing 0.12.0. See QueryIdUtils for
+    //       details.
+    String globalQueryId = getGlobalQueryId(queryId);
+    List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
+    for (ServerInstance serverInstance : queryServers._servers) {
+      serverUrls.add(String.format("%s/query/%s/progress", 
serverInstance.getAdminEndpoint(), globalQueryId));
+    }
+    if (serverUrls.isEmpty()) {
+      return null;
+    }
+
+    CompletionService<MultiHttpRequestResponse> completionService =
+        new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null, 
timeoutMs);
+    List<QueryProgressStats> serverProgressStats = new 
ArrayList<>(serverUrls.size());
+    List<String> errMsgs = new ArrayList<>(serverUrls.size());
+    for (int i = 0; i < serverUrls.size(); i++) {
+      MultiHttpRequestResponse httpRequestResponse = null;
+      URI uri = null;
+      try {
+        httpRequestResponse = completionService.take().get();
+        uri = httpRequestResponse.getURI();
+        int status = httpRequestResponse.getResponse().getCode();
+        if (status == 200) {
+          String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+          serverProgressStats.add(JsonUtils.stringToObject(responseString, 
QueryProgressStats.class));
+        } else if (status != 404) {
+          String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+          throw new Exception(
+              String.format("Unexpected status=%d and response='%s' from 
uri='%s'", status, responseString, uri));
+        }
+      } catch (Exception e) {
+        LOGGER.debug("Failed to get progress for query id: {} from uri: {}", 
queryId, uri, e);
+        errMsgs.add(e.getMessage());
+      } finally {
+        if (httpRequestResponse != null) {
+          httpRequestResponse.close();
+        }
+      }
+    }
+    if (!serverProgressStats.isEmpty()) {
+      return QueryProgressStats.aggregate(serverProgressStats);

Review Comment:
   This still returns a partial aggregate over only the SSE servers that 
replied with 200. If one targeted server times out or is temporarily 
unreachable, its unfinished work disappears from the denominator and the broker 
can report inflated progress, including 100%, even though the query is still 
blocked on that server. The MSE path now avoids that by treating missing 
servers as unknown progress; SSE needs the same treatment here (or retained 
last-known totals) instead of returning a partial aggregate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to