Copilot commented on code in PR #18649:
URL: https://github.com/apache/pinot/pull/18649#discussion_r3338744152
##########
pinot-controller/src/main/resources/app/pages/Query.tsx:
##########
@@ -371,15 +404,29 @@ const QueryPage = () => {
})
}
- const results = await PinotMethodUtils.getQueryResults(params);
- setResultError(results.exceptions || []);
- setResultData(results.result || { columns: [], records: [] });
- setQueryStats(results.queryStats || { columns: QUERY_STATS_COLUMNS,
records: [] });
- setOutputResult(JSON.stringify(results.data, null, 2) || '');
- setStageStats(results?.data?.stageStats || {});
- setWarnings(extractWarnings(results));
- setQueryLoader(false);
- queryExecuted.current = false;
+ const progressTimer = window.setInterval(async () => {
+ try {
+ const response = await getClientQueryProgress(clientQueryId,
QUERY_PROGRESS_POLL_INTERVAL_MS);
+ setQueryProgress(response.data);
+ } catch (error) {
+ // The query might not be registered yet, or may already have
completed.
+ }
+ }, QUERY_PROGRESS_POLL_INTERVAL_MS);
Review Comment:
The progress poller uses `setInterval(async () => { await ... })`, which can
overlap requests if a poll takes longer than the interval (e.g., due to
`timeoutMs=1000` or network slowness). This can create a buildup of concurrent
progress requests and unnecessary load.
Consider adding an in-flight guard (or switching to a self-scheduling
`setTimeout` loop) so only one progress request is outstanding at a time.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -482,6 +488,28 @@ public void cancel(Worker.CancelRequest request,
StreamObserver<Worker.CancelRes
responseObserver.onCompleted();
}
+ @Override
+ public void progress(Worker.QueryProgressRequest request,
+ StreamObserver<Worker.QueryProgressResponse> responseObserver) {
+ long requestId = request.getRequestId();
+ try {
+ QueryProgressStats progressStats =
_queryRunner.getQueryProgressStats(requestId);
+ Worker.QueryProgressResponse.Builder builder =
Worker.QueryProgressResponse.newBuilder();
+ if (progressStats != null) {
+ builder.setFound(true)
+ .setProcessedWorkUnits(progressStats.getProcessedWorkUnits())
+ .setTotalWorkUnits(progressStats.getTotalWorkUnits())
+ .setProcessedSegments(progressStats.getProcessedSegments())
+
.setTotalSegmentsToProcess(progressStats.getTotalSegmentsToProcess())
+ .setEstimated(progressStats.isEstimated());
+ }
+ responseObserver.onNext(builder.build());
+ } catch (Throwable t) {
+ LOGGER.error("Caught exception while getting progress for request: {}",
requestId, t);
+ }
+ responseObserver.onCompleted();
+ }
Review Comment:
`progress()` may complete the RPC without sending a response if an exception
is thrown. For unary gRPC calls, the server should either send exactly one
response via `onNext()` or terminate with `onError()`. Completing without
`onNext()` can surface to clients as an INTERNAL error ("completed without a
response") and break progress polling under transient failures.
Consider always returning a response (e.g., `found=false`) even when
progress lookup fails, and keep logging the exception for debugging.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -338,6 +340,61 @@ protected boolean handleCancel(long queryId, int
timeoutMs, Executor executor, H
return true;
}
+ @Override
+ @Nullable
+ public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs,
Executor executor,
+ HttpClientConnectionManager connMgr)
+ throws Exception {
+ Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation
is not enabled on broker");
+ QueryServers queryServers = _serversById.get(queryId);
+ if (queryServers == null) {
+ return null;
+ }
+
+ // TODO: Use different global query id for OFFLINE and REALTIME table
after releasing 0.12.0. See QueryIdUtils for
+ // details.
+ String globalQueryId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
+ for (ServerInstance serverInstance : queryServers._servers) {
+ serverUrls.add(String.format("%s/query/%s/progress",
serverInstance.getAdminEndpoint(), globalQueryId));
+ }
+ if (serverUrls.isEmpty()) {
+ return null;
+ }
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null,
timeoutMs);
+ List<QueryProgressStats> serverProgressStats = new
ArrayList<>(serverUrls.size());
+ List<String> errMsgs = new ArrayList<>(serverUrls.size());
+ for (int i = 0; i < serverUrls.size(); i++) {
+ MultiHttpRequestResponse httpRequestResponse = null;
+ try {
+ httpRequestResponse = completionService.take().get();
+ URI uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ if (status == 200) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ serverProgressStats.add(JsonUtils.stringToObject(responseString,
QueryProgressStats.class));
+ } else if (status != 404) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ throw new Exception(
+ String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get progress for query: {}",
queryServers._query, e);
+ errMsgs.add(e.getMessage());
+ } finally {
+ if (httpRequestResponse != null) {
+ httpRequestResponse.close();
+ }
+ }
+ }
+ if (errMsgs.size() > 0) {
+ throw new Exception("Unexpected responses from servers: " +
StringUtils.join(errMsgs, ","));
+ }
+ return serverProgressStats.isEmpty() ? null :
QueryProgressStats.aggregate(serverProgressStats);
Review Comment:
`getQueryProgressStats()` currently throws if *any* server progress request
fails (`errMsgs.size() > 0`), even if other servers responded successfully. For
progress reporting (as opposed to cancellation), this makes the feature
brittle: a single slow/unreachable server can turn the whole request into a 500
instead of returning partial/aggregated progress.
Consider returning aggregated progress when at least one server response was
successful, and only throwing when there were no successful responses and at
least one hard error.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -338,6 +340,61 @@ protected boolean handleCancel(long queryId, int
timeoutMs, Executor executor, H
return true;
}
+ @Override
+ @Nullable
+ public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs,
Executor executor,
+ HttpClientConnectionManager connMgr)
+ throws Exception {
+ Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation
is not enabled on broker");
+ QueryServers queryServers = _serversById.get(queryId);
+ if (queryServers == null) {
+ return null;
+ }
+
+ // TODO: Use different global query id for OFFLINE and REALTIME table
after releasing 0.12.0. See QueryIdUtils for
+ // details.
+ String globalQueryId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
+ for (ServerInstance serverInstance : queryServers._servers) {
+ serverUrls.add(String.format("%s/query/%s/progress",
serverInstance.getAdminEndpoint(), globalQueryId));
+ }
+ if (serverUrls.isEmpty()) {
+ return null;
+ }
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null,
timeoutMs);
+ List<QueryProgressStats> serverProgressStats = new
ArrayList<>(serverUrls.size());
+ List<String> errMsgs = new ArrayList<>(serverUrls.size());
+ for (int i = 0; i < serverUrls.size(); i++) {
+ MultiHttpRequestResponse httpRequestResponse = null;
+ try {
+ httpRequestResponse = completionService.take().get();
+ URI uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ if (status == 200) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ serverProgressStats.add(JsonUtils.stringToObject(responseString,
QueryProgressStats.class));
+ } else if (status != 404) {
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ throw new Exception(
+ String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get progress for query: {}",
queryServers._query, e);
+ errMsgs.add(e.getMessage());
+ } finally {
Review Comment:
In the progress polling path, logging the full plaintext query at WARN
(`queryServers._query`) can leak sensitive query contents and may generate
high-volume logs if some servers frequently time out/unavailable during polling.
Prefer logging the numeric `queryId` (and optionally the server URI) at
DEBUG, similar to other high-frequency telemetry, to avoid exposing query text
and reduce log noise.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java:
##########
@@ -371,6 +407,51 @@ private Map<String, Map<String, String>>
getRunningQueries(Map<String, InstanceI
return queriesByBroker;
}
+ @Nullable
+ private QueryProgressStats getClientQueryProgress(Map<String, InstanceInfo>
brokers, String clientQueryId,
+ int timeoutMs, Map<String, String> requestHeaders)
+ throws Exception {
+ String protocol = _controllerConf.getControllerBrokerProtocol();
+ int portOverride = _controllerConf.getControllerBrokerPortOverride();
+ List<String> brokerUrls = new ArrayList<>();
+ for (InstanceInfo broker : brokers.values()) {
+ int port = portOverride > 0 ? portOverride : broker.getPort();
+ brokerUrls.add(new
URIBuilder().setScheme(protocol).setHost(broker.getHost()).setPort(port)
+ .setPathSegments("query", clientQueryId,
"progress").addParameter("client", "true")
+ .addParameter("timeoutMs",
Integer.toString(timeoutMs)).build().toString());
+ }
+ LOGGER.debug("Getting query progress via broker urls: {}", brokerUrls);
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(_executor, _httpConnMgr).executeGet(brokerUrls,
requestHeaders, timeoutMs);
+ List<String> errMsgs = new ArrayList<>(brokerUrls.size());
+ for (int i = 0; i < brokerUrls.size(); i++) {
+ MultiHttpRequestResponse httpRequestResponse = null;
+ try {
+ httpRequestResponse = completionService.take().get();
+ URI uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ if (status == 200) {
+ return JsonUtils.stringToObject(responseString,
QueryProgressStats.class);
+ } else if (status != 404) {
+ throw new Exception(
+ String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get progress for client query: {}",
clientQueryId, e);
+ errMsgs.add(e.getMessage());
+ } finally {
+ if (httpRequestResponse != null) {
+ httpRequestResponse.close();
+ }
+ }
+ }
+ if (errMsgs.size() > 0) {
+ throw new Exception("Unexpected responses from brokers: " +
StringUtils.join(errMsgs, ","));
+ }
+ return null;
Review Comment:
The controller's `getClientQueryProgress(...)` throws if *any* broker
request errors (`errMsgs.size() > 0`), even when other brokers may have
returned 404 (not tracking the query). This conflicts with the endpoint's
contract in the `@ApiOperation` notes: it should return progress if any broker
finds it, otherwise behave like "not found" rather than failing due to a single
broker error.
Consider only failing when *all* brokers returned an error (or when there
were zero 404/empty responses), and otherwise returning `null` so the caller
can surface 404.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]