xiangfu0 commented on code in PR #18649:
URL: https://github.com/apache/pinot/pull/18649#discussion_r3348371464
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -597,6 +598,54 @@ private static String toHostnamePortKey(String hostname,
int port) {
return String.format("%s_%d", hostname, port);
}
+ @Nullable
+ public QueryProgressStats getQueryProgressStats(long requestId, int
timeoutMs) {
+ if (!isQueryCancellationEnabled()) {
+ return null;
+ }
+ Set<QueryServerInstance> servers = _serversByQuery.get(requestId);
+ if (servers == null || servers.isEmpty()) {
+ return null;
+ }
+
+ Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
+ SendRequest<Long, Worker.QueryProgressResponse> sendRequest =
DispatchClient::progress;
+ BlockingQueue<AsyncResponse<Worker.QueryProgressResponse>>
dispatchCallbacks =
+ dispatch(sendRequest, new HashSet<>(servers), deadline, serverInstance
-> requestId);
+ List<QueryProgressStats> progressStatsList = new
ArrayList<>(servers.size());
+ int numResponses = 0;
+ while (!deadline.isExpired() && numResponses < servers.size()) {
+ try {
+ AsyncResponse<Worker.QueryProgressResponse> response =
+ dispatchCallbacks.poll(Math.max(1,
deadline.timeRemaining(TimeUnit.MILLISECONDS)), TimeUnit.MILLISECONDS);
+ if (response == null) {
+ LOGGER.debug("No progress response from server for query: {}",
requestId);
+ continue;
+ }
+ numResponses++;
+ if (response.getThrowable() != null) {
+ LOGGER.debug("Failed to get progress for query: {} from server: {}",
requestId, response.getServerInstance(),
+ response.getThrowable());
+ continue;
+ }
+ Worker.QueryProgressResponse queryProgressResponse =
response.getResponse();
+ if (queryProgressResponse != null && queryProgressResponse.getFound())
{
+ progressStatsList.add(new
QueryProgressStats(queryProgressResponse.getProcessedWorkUnits(),
+ queryProgressResponse.getTotalWorkUnits(),
queryProgressResponse.getProcessedSegments(),
+ queryProgressResponse.getTotalSegmentsToProcess(),
queryProgressResponse.getEstimated()));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.debug("Interrupted while getting progress for query: {}",
requestId, e);
+ break;
+ }
+ }
+ if (deadline.isExpired()) {
+ LOGGER.debug("Timed out waiting for progress response for query: {}",
requestId);
+ }
+ return progressStatsList.isEmpty() ? null :
QueryProgressStats.aggregate(progressStatsList);
Review Comment:
This returns an aggregate over only the servers that responded before the
deadline. If one server is still running or transiently unreachable, its
unfinished work disappears from the denominator and the broker can report 100%
even though the query is still blocked on that server. Please avoid returning a
partial aggregate here: either fail/mark progress unknown when any targeted
server is missing, or retain last-known totals so missing servers do not shrink
the denominator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]