This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/kill-master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f5906d86686a0e515c955d57af554064acf90fec
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Mar 26 20:51:11 2026 +0800

    [To dev/1.3] Fix kill query doesn't take effect bug (#17358)
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  27 ++++--
 .../db/queryengine/common/MPPQueryContext.java     |  14 +--
 .../queryengine/execution/QueryStateMachine.java   |   4 +-
 .../fragment/FragmentInstanceManager.java          |   2 +
 .../iotdb/db/queryengine/plan/Coordinator.java     |  34 ++++++-
 .../analyze/schema/ClusterSchemaFetchExecutor.java |  43 +++++----
 .../plan/execution/IQueryExecution.java            |  25 ++++-
 .../queryengine/plan/execution/QueryExecution.java | 101 ++++++++++++++-------
 .../plan/execution/config/ConfigExecution.java     |  27 ++++--
 .../operator/MergeTreeSortOperatorTest.java        |  11 ++-
 10 files changed, 206 insertions(+), 82 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index d3f6b7d91dd..ed754c418c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -245,6 +245,7 @@ import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 import static 
org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
 import static org.apache.iotdb.rpc.RpcUtils.TIME_PRECISION;
+import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED;
 
 public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
@@ -286,6 +287,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
+  private static final String NO_QUERY_EXECUTION_ERR_MSG =
+      "Query is not found, it may be killed by others, timeout or some other 
runtime errors, you can see more details in server log.";
+
   @FunctionalInterface
   public interface SelectResult {
 
@@ -1526,15 +1530,18 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         finished = true;
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
-      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
       queryExecution = COORDINATOR.getQueryExecution(req.queryId);
 
       if (queryExecution == null) {
-        resp.setHasResultSet(false);
-        resp.setMoreData(false);
-        return resp;
+        TSStatus noQueryExecutionStatus = new 
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+        noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+        return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
       }
+
+      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+
+      queryExecution.updateCurrentRpcStartTime(startTime);
       statementType = queryExecution.getStatementType();
 
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
@@ -2272,16 +2279,16 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
 
-      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
-
       queryExecution = COORDINATOR.getQueryExecution(req.queryId);
       if (queryExecution == null) {
-        resp.setHasResultSet(false);
-        resp.setMoreData(true);
-        return resp;
+        TSStatus noQueryExecutionStatus = new 
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+        noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+        return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
       }
+      queryExecution.updateCurrentRpcStartTime(startTime);
       statementType = queryExecution.getStatementType();
 
+      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
         Pair<TSQueryDataSet, Boolean> pair =
             convertTsBlockByFetchSize(queryExecution, req.fetchSize);
@@ -2291,7 +2298,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         resp.setHasResultSet(hasResultSet);
         resp.setQueryDataSet(result);
         resp.setIsAlign(true);
-        resp.setMoreData(finished);
+        resp.setMoreData(!finished);
         return resp;
       }
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 0294a14af25..88bd1998f68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -76,7 +76,11 @@ public class MPPQueryContext implements IAuditEntity {
   private long localQueryId;
   private SessionInfo session;
   private QueryType queryType = QueryType.READ;
+
+  /** the max executing time of query in ms. Unit: millisecond */
   private long timeOut;
+
+  // time unit is ms
   private long startTime;
 
   private TEndPoint localDataBlockEndpoint;
@@ -147,6 +151,7 @@ public class MPPQueryContext implements IAuditEntity {
   // Tables in the subquery
   private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new 
HashMap<>();
 
+  @TestOnly
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
     this.endPointBlackList = ConcurrentHashMap.newKeySet();
@@ -161,12 +166,7 @@ public class MPPQueryContext implements IAuditEntity {
       SessionInfo session,
       TEndPoint localDataBlockEndpoint,
       TEndPoint localInternalEndpoint) {
-    this(queryId);
-    this.sql = sql;
-    this.session = session;
-    this.localDataBlockEndpoint = localDataBlockEndpoint;
-    this.localInternalEndpoint = localInternalEndpoint;
-    this.initResultNodeContext();
+    this(sql, queryId, -1, session, localDataBlockEndpoint, 
localInternalEndpoint);
   }
 
   public MPPQueryContext(
@@ -244,10 +244,12 @@ public class MPPQueryContext implements IAuditEntity {
     return queryType;
   }
 
+  /** the max executing time of query in ms. Unit: millisecond */
   public long getTimeOut() {
     return timeOut;
   }
 
+  /** the max executing time of query in ms. Unit: millisecond */
   public void setTimeOut(long timeOut) {
     this.timeOut = timeOut;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index 146359bc215..19d0fb38749 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -107,10 +107,10 @@ public class QueryStateMachine {
     transitionToDoneState(CANCELED);
   }
 
-  public void transitionToCanceled(Throwable throwable, TSStatus 
failureStatus) {
+  public boolean transitionToCanceled(Throwable throwable, TSStatus 
failureStatus) {
     this.failureStatus.compareAndSet(null, failureStatus);
     this.failureException.compareAndSet(null, throwable);
-    transitionToDoneState(CANCELED);
+    return transitionToDoneState(CANCELED);
   }
 
   public void transitionToAborted() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 3f6812943d6..1898cbfe53c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.execution.schedule.DriverScheduler;
 import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -435,6 +436,7 @@ public class FragmentInstanceManager {
                             + "ms, and now is in flushing state"));
           }
         });
+    Coordinator.getInstance().cleanUpStaleQueries();
   }
 
   public ExecutorService getIntoOperationExecutor() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index b0d28f03c02..9342b3a4674 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.memory.MemoryBlockType;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.PreparedStatementInfo;
@@ -805,7 +806,7 @@ public class Coordinator {
   }
 
   public void cleanupQueryExecution(Long queryId, Supplier<String> 
contentSupplier, Throwable t) {
-    IQueryExecution queryExecution = getQueryExecution(queryId);
+    IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
     if (queryExecution != null) {
       cleanupQueryExecutionInternal(queryId, queryExecution, contentSupplier, 
t);
     }
@@ -813,7 +814,7 @@ public class Coordinator {
 
   public void cleanupQueryExecution(
       Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable 
t) {
-    IQueryExecution queryExecution = getQueryExecution(queryId);
+    IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
     if (queryExecution != null) {
       Supplier<String> contentSupplier =
           new ContentOfQuerySupplier(nativeApiRequest, queryExecution);
@@ -898,6 +899,35 @@ public class Coordinator {
     }
   }
 
+  /**
+   * We need to reclaim resources from queries that have exceeded their 
timeout by more than one
+   * minute. This indicates that the associated clients have failed to perform 
proper resource
+   * cleanup.
+   */
+  public void cleanUpStaleQueries() {
+    long currentTime = System.currentTimeMillis();
+    queryExecutionMap.forEach(
+        (queryId, queryExecution) -> {
+          if (queryExecution.isActive()) {
+            return;
+          }
+          long timeout = queryExecution.getTimeout();
+          long queryStartTime = queryExecution.getStartExecutionTime();
+          long executeTime = currentTime - queryStartTime;
+          if (timeout > 0 && executeTime - 60_000L > timeout) {
+            LOGGER.warn(
+                "Cleaning up stale query with id {}, which has been running 
for {} ms, timeout duration is: {}ms",
+                queryId,
+                executeTime,
+                timeout);
+            cleanupQueryExecution(
+                queryId,
+                (org.apache.thrift.TBase<?, ?>) null,
+                new QueryTimeoutRuntimeException(queryStartTime, currentTime, 
timeout));
+          }
+        });
+  }
+
   public void cleanupQueryExecution(Long queryId) {
     cleanupQueryExecution(queryId, (org.apache.thrift.TBase<?, ?>) null, null);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index aad9f50aca0..d575f6420eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement;
@@ -265,30 +266,38 @@ class ClusterSchemaFetchExecutor {
             String.format("Fetch Schema failed, because %s", 
executionResult.status.getMessage()),
             executionResult.status.getCode());
       }
+      IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
       try (SetThreadName ignored = new 
SetThreadName(executionResult.queryId.getId())) {
         ClusterSchemaTree result = new ClusterSchemaTree();
         ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer =
             new ClusterSchemaTree.SchemaNodeBatchDeserializer();
         Set<String> databaseSet = new HashSet<>();
-        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
-          // The query will be transited to FINISHED when invoking 
getBatchResult() at the last time
-          // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock;
-          try {
-            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-          } catch (IoTDBException e) {
-            t = e;
-            throw new QuerySchemaFetchFailedException(
-                String.format("Fetch Schema failed: %s", e.getMessage()), 
e.getErrorCode());
-          }
-          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
-            break;
-          }
-          Column column = tsBlock.get().getColumn(0);
-          for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, deserializer, 
databaseSet, context);
+        if (queryExecution != null) {
+          while (queryExecution.hasNextResult()) {
+            // The query will be transited to FINISHED when invoking 
getBatchResult() at the last
+            // time
+            // So we don't need to clean up it manually
+            Optional<TsBlock> tsBlock;
+            try {
+              tsBlock = queryExecution.getBatchResult();
+            } catch (IoTDBException e) {
+              t = e;
+              throw new QuerySchemaFetchFailedException(
+                  String.format("Fetch Schema failed: %s", e.getMessage()), 
e.getErrorCode());
+            }
+            if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+              break;
+            }
+            Column column = tsBlock.get().getColumn(0);
+            for (int i = 0; i < column.getPositionCount(); i++) {
+              parseFetchedData(column.getBinary(i), result, deserializer, 
databaseSet, context);
+            }
           }
+        } else {
+          throw new RuntimeException(
+              String.format("Fetch Schema failed, because queryExecution is 
null for %s", queryId));
         }
+
         result.setDatabases(databaseSet);
         return result;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index f754bc4b10e..9b5a183d98a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -35,8 +35,6 @@ public interface IQueryExecution {
 
   void stop(Throwable t);
 
-  void stopAndCleanup();
-
   void stopAndCleanup(Throwable t);
 
   void cancel();
@@ -61,15 +59,38 @@ public interface IQueryExecution {
 
   String getQueryId();
 
+  // time unit is ms
   long getStartExecutionTime();
 
+  /**
+   * @param executionTime time unit should be ns
+   */
   void recordExecutionTime(long executionTime);
 
+  /**
+   * update current rpc start time, which is used to calculate rpc execution 
time and update total
+   * execution time
+   *
+   * @param startTime start time of current rpc, time unit is ns
+   */
+  void updateCurrentRpcStartTime(long startTime);
+
+  /**
+   * Check if there is an active RPC for this query. If {@code 
startTimeOfCurrentRpc == -1}, it
+   * means there is no active RPC, otherwise there is an active RPC. An active 
RPC means that the
+   * client is still fetching results and the QueryExecution should not be 
cleaned up until the RPC
+   * finishes. On the other hand, if there is no active RPC, it means that the 
client has finished
+   * fetching results or has not started fetching results yet, and the 
QueryExecution can be safely
+   * cleaned up.
+   */
+  boolean isActive();
+
   /**
    * @return cost time in ns
    */
   long getTotalExecutionTime();
 
+  /** the max executing time of query in ms. Unit: millisecond */
   long getTimeout();
 
   Optional<String> getExecuteSQL();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 6e940e9816e..7500fbb9c34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import 
org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle;
@@ -108,9 +109,14 @@ public class QueryExecution implements IQueryExecution {
 
   private final AtomicBoolean stopped;
 
-  // cost time in ns
+  // cost time in ns of finished rpc
   private long totalExecutionTime = 0;
 
+  // -1 if previous rpc is finished and next client req hasn't come yet, unit 
is ns
+  // it will be updated in fetchResult rpc
+  // protected by synchronized(this)
+  private volatile long startTimeOfCurrentRpc = System.nanoTime();
+
   private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
       QueryExecutionMetricSet.getInstance();
   private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET =
@@ -133,14 +139,19 @@ public class QueryExecution implements IQueryExecution {
             if (!state.isDone()) {
               return;
             }
+            Throwable cause = null;
             if (state == QueryState.FAILED
                 || state == QueryState.ABORTED
                 || state == QueryState.CANCELED) {
-              LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
-              Throwable cause = stateMachine.getFailureException();
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
+              }
+              cause = stateMachine.getFailureException();
               releaseResource(cause);
+
+              this.cleanUpCoordinatorContextMapIfNeeded(cause);
             }
-            this.stop(null);
+            this.stop(cause);
           }
         });
     this.stopped = new AtomicBoolean(false);
@@ -321,33 +332,20 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
-  // Stop the query and clean up all the resources this query occupied
-  @Override
-  public void stopAndCleanup() {
-    stop(null);
-    releaseResource();
-  }
-
   @Override
   public void cancel() {
-    stateMachine.transitionToCanceled(
-        new KilledByOthersException(),
-        new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
-            .setMessage(KilledByOthersException.MESSAGE));
-  }
-
-  /** Release the resources that current QueryExecution hold. */
-  private void releaseResource() {
-    // close ResultHandle to unblock client's getResult request
-    // Actually, we should not close the ResultHandle when the QueryExecution 
is Finished.
-    // There are only two scenarios where the ResultHandle should be closed:
-    //   1. The client fetch all the result and the ResultHandle is finished.
-    //   2. The client's connection is closed that all owned QueryExecution 
should be cleaned up
-    // If the QueryExecution's state is abnormal, we should also abort the 
resultHandle without
-    // waiting it to be finished.
-    if (resultHandle != null) {
-      resultHandle.close();
-      cleanUpResultHandle();
+    Throwable cause = new KilledByOthersException();
+    boolean cancelled =
+        stateMachine.transitionToCanceled(
+            cause,
+            new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
+                .setMessage(KilledByOthersException.MESSAGE));
+    if (!cancelled) {
+      // cancel failed, means this query has already in a done state, we can 
do nothing to change
+      // the state but clean up the resource if needed
+      // we don't need to do cleanUpCoordinatorContextMapIfNeeded if cancel 
succeed, because it will
+      // be called in callback logic in QueryStateMachine of this 
QueryExecution
+      this.cleanUpCoordinatorContextMapIfNeeded(cause);
     }
   }
 
@@ -356,7 +354,7 @@ public class QueryExecution implements IQueryExecution {
     // We don't need to deal with MemorySourceHandle because it doesn't 
register to memory pool
     // We don't need to deal with LocalSourceHandle because the 
SharedTsBlockQueue uses the upstream
     // FragmentInstanceId to register
-    if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle 
instanceof SourceHandle) {
+    if (resultHandle instanceof SourceHandle) {
       TFragmentInstanceId fragmentInstanceId = 
resultHandle.getLocalFragmentInstanceId();
       MPPDataExchangeService.getInstance()
           .getMPPDataExchangeManager()
@@ -384,7 +382,7 @@ public class QueryExecution implements IQueryExecution {
     //   2. The client's connection is closed that all owned QueryExecution 
should be cleaned up
     // If the QueryExecution's state is abnormal, we should also abort the 
resultHandle without
     // waiting it to be finished.
-    if (resultHandle != null) {
+    if (resultHandle != null && resultHandleCleanUp.compareAndSet(false, 
true)) {
       if (t != null) {
         resultHandle.abort(t);
       } else {
@@ -394,6 +392,32 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
+  /**
+   * Clear up Coordinator.queryExecutionMap by calling 
Coordinator.cleanupQueryExecution if there is
+   * no RPC in progress for this query (that is, the current RPC has finished 
and {@code
+   * startTimeOfCurrentRpc == -1}). In cases where an RPC is still active, the 
finally block in
+   * ClientRPCServiceImpl is responsible for performing the cleanup.
+   */
+  private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) {
+    if (isActive()) {
+      Coordinator.getInstance()
+          .cleanupQueryExecution(
+              context.getLocalQueryId(), (org.apache.thrift.TBase<?, ?>) null, 
t);
+    }
+  }
+
+  /**
+   * Check if there is an active RPC for this query. If {@code 
startTimeOfCurrentRpc == -1}, it
+   * means there is no active RPC, otherwise there is an active RPC. An active 
RPC means that the
+   * client is still fetching results and the QueryExecution should not be 
cleaned up until the RPC
+   * finishes. On the other hand, if there is no active RPC, it means that the 
client has finished
+   * fetching results or has not started fetching results yet, and the 
QueryExecution can be safely
+   * cleaned up.
+   */
+  public synchronized boolean isActive() {
+    return startTimeOfCurrentRpc == -1;
+  }
+
   /**
    * This method will be called by the request thread from client connection. 
This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. 
There is no more result
@@ -671,13 +695,22 @@ public class QueryExecution implements IQueryExecution {
   }
 
   @Override
-  public void recordExecutionTime(long executionTime) {
+  public synchronized void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
+    // recordExecutionTime is called after current rpc finished, so we need to 
set
+    // startTimeOfCurrentRpc to -1
+    this.startTimeOfCurrentRpc = -1;
+  }
+
+  @Override
+  public synchronized void updateCurrentRpcStartTime(long startTime) {
+    this.startTimeOfCurrentRpc = startTime;
   }
 
   @Override
-  public long getTotalExecutionTime() {
-    return totalExecutionTime;
+  public synchronized long getTotalExecutionTime() {
+    return totalExecutionTime
+        + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - 
startTimeOfCurrentRpc);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index f431bb2e51f..2b70a667be9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -113,6 +113,12 @@ public class ConfigExecution implements IQueryExecution {
   private final StatementType statementType;
   private long totalExecutionTime;
 
+  // -1 if previous rpc is finished and next client req hasn't come yet, unit 
is ns
+  // it will be updated in fetchResult rpc
+  // currently, ConfigExecution will return result is just one call, so this 
field is not used. But
+  // we will keep it for future use when ConfigExecution may return result in 
multiple calls
+  private volatile long startTimeOfCurrentRpc = System.nanoTime();
+
   public ConfigExecution(
       MPPQueryContext context,
       StatementType statementType,
@@ -222,11 +228,6 @@ public class ConfigExecution implements IQueryExecution {
     // do nothing
   }
 
-  @Override
-  public void stopAndCleanup() {
-    // do nothing
-  }
-
   @Override
   public void stopAndCleanup(Throwable t) {
     // do nothing
@@ -327,11 +328,25 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
+    // recordExecutionTime is called after current rpc finished, so we need to 
set
+    // startTimeOfCurrentRpc to -1
+    this.startTimeOfCurrentRpc = -1;
+  }
+
+  @Override
+  public void updateCurrentRpcStartTime(long startTime) {
+    this.startTimeOfCurrentRpc = startTime;
+  }
+
+  @Override
+  public boolean isActive() {
+    return startTimeOfCurrentRpc == -1;
   }
 
   @Override
   public long getTotalExecutionTime() {
-    return totalExecutionTime;
+    return totalExecutionTime
+        + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - 
startTimeOfCurrentRpc);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
index 1739944be7c..2d0c1bf543c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
@@ -1826,6 +1826,14 @@ public class MergeTreeSortOperatorTest {
     @Override
     public void recordExecutionTime(long executionTime) {}
 
+    @Override
+    public void updateCurrentRpcStartTime(long startTime) {}
+
+    @Override
+    public boolean isActive() {
+      return true;
+    }
+
     @Override
     public long getTotalExecutionTime() {
       return 0;
@@ -1857,9 +1865,6 @@ public class MergeTreeSortOperatorTest {
     @Override
     public void stop(Throwable t) {}
 
-    @Override
-    public void stopAndCleanup() {}
-
     @Override
     public void stopAndCleanup(Throwable t) {}
 

Reply via email to