This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5456d83cb Revert "Use stats cache on error instead of the chained 
mechanism (#15992)" (#17305)
ee5456d83cb is described below

commit ee5456d83cbeb437581fd57b1fc4f593811c5b8a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Dec 23 11:21:54 2025 +0100

    Revert "Use stats cache on error instead of the chained mechanism (#15992)" 
(#17305)
    
    * Revert QueryDispatcher changes for "Use stats cache on error instead of 
the chained mechanism (#15992)"
    
    This reverts commit 3158efa3
    
    * Fix test
---
 .../query/service/dispatch/QueryDispatcher.java    | 27 ++++++++--------------
 .../service/dispatch/QueryDispatcherTest.java      |  5 ++--
 2 files changed, 12 insertions(+), 20 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 0d183d78a36..f7a54591abe 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -163,24 +163,22 @@ public class QueryDispatcher {
       throws Exception {
     long requestId = context.getRequestId();
     Set<QueryServerInstance> servers = new HashSet<>();
-    boolean cancelled = false;
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
       QueryResult result = runReducer(dispatchableSubPlan, queryOptions, 
_mailboxService);
       if (result.getProcessingException() != null) {
-        LOGGER.warn("Query result includes processing exceptions. Trying to 
cancel the other opchains");
-        MultiStageQueryStats statsFromCancel = cancelWithStats(requestId, 
servers);
-        cancelled = true;
-        return result.withStats(statsFromCancel);
+        cancel(requestId);
       }
       return result;
     } catch (Exception ex) {
-      QueryResult queryResult = tryRecover(context.getRequestId(), servers, 
ex);
-      cancelled = true;
-      return queryResult;
+      return tryRecover(context.getRequestId(), servers, ex);
+    } catch (Throwable e) {
+      // TODO: Consider always cancel when it returns (early terminate)
+      cancel(requestId);
+      throw e;
     } finally {
-      if (!cancelled) {
-        cancel(requestId, servers);
+      if (isQueryCancellationEnabled()) {
+        _serversByQuery.remove(requestId);
       }
     }
   }
@@ -204,6 +202,7 @@ public class QueryDispatcher {
       errorCode = ((QueryException) ex).getErrorCode();
     } else {
       // in case of unknown exceptions, the exception will be rethrown, so we 
don't need stats
+      cancel(requestId, servers);
       throw ex;
     }
     // in case of known exceptions (timeout or query exception), we need can 
build here the erroneous QueryResult
@@ -765,14 +764,6 @@ public class QueryDispatcher {
       }
     }
 
-    public QueryResult withStats(MultiStageQueryStats newQueryStats) {
-      if (_processingException != null) {
-        return new QueryResult(_processingException, newQueryStats, 
_brokerReduceTimeMs);
-      } else {
-        return new QueryResult(_resultTable, newQueryStats, 
_brokerReduceTimeMs);
-      }
-    }
-
     @Nullable
     public ResultTable getResultTable() {
       return _resultTable;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index dbdd0b1b752..8ef49b01760 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -73,7 +73,8 @@ public class QueryDispatcherTest extends QueryTestSet {
     _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, 
portList.get(0), portList.get(1),
         QueryEnvironmentTestBase.TABLE_SCHEMAS, 
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
         QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
-    _queryDispatcher = new QueryDispatcher(Mockito.mock(MailboxService.class), 
Mockito.mock(FailureDetector.class));
+    _queryDispatcher =
+        new QueryDispatcher(Mockito.mock(MailboxService.class), 
Mockito.mock(FailureDetector.class), null, true);
   }
 
   @AfterClass
@@ -140,7 +141,7 @@ public class QueryDispatcherTest extends QueryTestSet {
   }
 
   @Test
-  public void testQueryDispatcherCancelWhenQueryReducerThrowsError()
+  public void testQueryDispatcherCancelWhenQueryReducerReturnsError()
       throws Exception {
     String sql = "SELECT * FROM a";
     long requestId = REQUEST_ID_GEN.getAndIncrement();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to