xiangfu0 commented on code in PR #18649:
URL: https://github.com/apache/pinot/pull/18649#discussion_r3365497895


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -338,6 +340,65 @@ protected boolean handleCancel(long queryId, int 
timeoutMs, Executor executor, H
     return true;
   }
 
+  @Override
+  @Nullable
+  public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs, 
Executor executor,
+      HttpClientConnectionManager connMgr)
+      throws Exception {
+    Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation 
is not enabled on broker");
+    QueryServers queryServers = _serversById.get(queryId);
+    if (queryServers == null) {
+      return null;
+    }
+
+    // TODO: Use different global query id for OFFLINE and REALTIME table 
after releasing 0.12.0. See QueryIdUtils for
+    //       details.
+    String globalQueryId = getGlobalQueryId(queryId);
+    List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
+    for (ServerInstance serverInstance : queryServers._servers) {
+      serverUrls.add(String.format("%s/query/%s/progress", 
serverInstance.getAdminEndpoint(), globalQueryId));
+    }
+    if (serverUrls.isEmpty()) {
+      return null;
+    }
+
+    CompletionService<MultiHttpRequestResponse> completionService =
+        new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null, 
timeoutMs);
+    List<QueryProgressStats> serverProgressStats = new 
ArrayList<>(serverUrls.size());
+    List<String> errMsgs = new ArrayList<>(serverUrls.size());
+    for (int i = 0; i < serverUrls.size(); i++) {
+      MultiHttpRequestResponse httpRequestResponse = null;
+      URI uri = null;
+      try {
+        httpRequestResponse = completionService.take().get();
+        uri = httpRequestResponse.getURI();
+        int status = httpRequestResponse.getResponse().getCode();
+        if (status == 200) {
+          String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+          serverProgressStats.add(JsonUtils.stringToObject(responseString, 
QueryProgressStats.class));
+        } else if (status != 404) {
+          String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+          throw new Exception(
+              String.format("Unexpected status=%d and response='%s' from 
uri='%s'", status, responseString, uri));
+        }
+      } catch (Exception e) {
+        LOGGER.debug("Failed to get progress for query id: {} from uri: {}", 
queryId, uri, e);
+        errMsgs.add(e.getMessage());
+      } finally {
+        if (httpRequestResponse != null) {
+          httpRequestResponse.close();
+        }
+      }
+    }
+    if (!serverProgressStats.isEmpty()) {
+      return QueryProgressStats.aggregate(serverProgressStats);

Review Comment:
   Addressed in the latest push. SSE still returns the compact one-line 
progress shape, but the broker no longer aggregates over only the 200 
responses. If any targeted SSE server is missing while at least one server 
reports progress, the broker adds an unknown placeholder before aggregating, so 
total work becomes unknown instead of shrinking the denominator or falsely 
reporting 100%. Added a regression test for the partial-server response case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to