This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ee5456d83cb Revert "Use stats cache on error instead of the chained
mechanism (#15992)" (#17305)
ee5456d83cb is described below
commit ee5456d83cbeb437581fd57b1fc4f593811c5b8a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Dec 23 11:21:54 2025 +0100
Revert "Use stats cache on error instead of the chained mechanism (#15992)"
(#17305)
* Revert QueryDispatcher changes for "Use stats cache on error instead of
the chained mechanism (#15992)"
This reverts commit 3158efa3
* Fix test
---
.../query/service/dispatch/QueryDispatcher.java | 27 ++++++++--------------
.../service/dispatch/QueryDispatcherTest.java | 5 ++--
2 files changed, 12 insertions(+), 20 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 0d183d78a36..f7a54591abe 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -163,24 +163,22 @@ public class QueryDispatcher {
throws Exception {
long requestId = context.getRequestId();
Set<QueryServerInstance> servers = new HashSet<>();
- boolean cancelled = false;
try {
submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
QueryResult result = runReducer(dispatchableSubPlan, queryOptions,
_mailboxService);
if (result.getProcessingException() != null) {
- LOGGER.warn("Query result includes processing exceptions. Trying to
cancel the other opchains");
- MultiStageQueryStats statsFromCancel = cancelWithStats(requestId,
servers);
- cancelled = true;
- return result.withStats(statsFromCancel);
+ cancel(requestId);
}
return result;
} catch (Exception ex) {
- QueryResult queryResult = tryRecover(context.getRequestId(), servers,
ex);
- cancelled = true;
- return queryResult;
+ return tryRecover(context.getRequestId(), servers, ex);
+ } catch (Throwable e) {
+ // TODO: Consider always cancel when it returns (early terminate)
+ cancel(requestId);
+ throw e;
} finally {
- if (!cancelled) {
- cancel(requestId, servers);
+ if (isQueryCancellationEnabled()) {
+ _serversByQuery.remove(requestId);
}
}
}
@@ -204,6 +202,7 @@ public class QueryDispatcher {
errorCode = ((QueryException) ex).getErrorCode();
} else {
// in case of unknown exceptions, the exception will be rethrown, so we
don't need stats
+ cancel(requestId, servers);
throw ex;
}
// in case of known exceptions (timeout or query exception), we need can
build here the erroneous QueryResult
@@ -765,14 +764,6 @@ public class QueryDispatcher {
}
}
- public QueryResult withStats(MultiStageQueryStats newQueryStats) {
- if (_processingException != null) {
- return new QueryResult(_processingException, newQueryStats,
_brokerReduceTimeMs);
- } else {
- return new QueryResult(_resultTable, newQueryStats,
_brokerReduceTimeMs);
- }
- }
-
@Nullable
public ResultTable getResultTable() {
return _resultTable;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index dbdd0b1b752..8ef49b01760 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -73,7 +73,8 @@ public class QueryDispatcherTest extends QueryTestSet {
_queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1,
portList.get(0), portList.get(1),
QueryEnvironmentTestBase.TABLE_SCHEMAS,
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
- _queryDispatcher = new QueryDispatcher(Mockito.mock(MailboxService.class),
Mockito.mock(FailureDetector.class));
+ _queryDispatcher =
+ new QueryDispatcher(Mockito.mock(MailboxService.class),
Mockito.mock(FailureDetector.class), null, true);
}
@AfterClass
@@ -140,7 +141,7 @@ public class QueryDispatcherTest extends QueryTestSet {
}
@Test
- public void testQueryDispatcherCancelWhenQueryReducerThrowsError()
+ public void testQueryDispatcherCancelWhenQueryReducerReturnsError()
throws Exception {
String sql = "SELECT * FROM a";
long requestId = REQUEST_ID_GEN.getAndIncrement();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]