This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 4903f3c7e75 [To dev/1.3] Fix kill query doesn't take effect bug 
(#17358)
4903f3c7e75 is described below

commit 4903f3c7e75db904a8ca2a4b094da52cc88a569c
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Mar 26 20:51:11 2026 +0800

    [To dev/1.3] Fix kill query doesn't take effect bug (#17358)
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 27 +++---
 .../db/queryengine/common/MPPQueryContext.java     | 17 ++--
 .../queryengine/execution/QueryStateMachine.java   |  4 +-
 .../fragment/FragmentInstanceManager.java          |  2 +
 .../iotdb/db/queryengine/plan/Coordinator.java     | 35 +++++++-
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 41 +++++----
 .../plan/execution/IQueryExecution.java            | 25 +++++-
 .../queryengine/plan/execution/QueryExecution.java | 96 ++++++++++++++--------
 .../plan/execution/config/ConfigExecution.java     | 27 ++++--
 .../execution/operator/MergeSortOperatorTest.java  | 11 ++-
 10 files changed, 201 insertions(+), 84 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index b5edafec160..8a498fce2e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -205,6 +205,7 @@ import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 import static 
org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
+import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED;
 
 public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
@@ -241,6 +242,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
+  private static final String NO_QUERY_EXECUTION_ERR_MSG =
+      "Query is not found, it may be killed by others, timeout or some other 
runtime errors, you can see more details in server log.";
+
   @FunctionalInterface
   public interface SelectResult {
     boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, 
int fetchSize)
@@ -1147,15 +1151,18 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         finished = true;
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
-      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
       queryExecution = COORDINATOR.getQueryExecution(req.queryId);
 
       if (queryExecution == null) {
-        resp.setHasResultSet(false);
-        resp.setMoreData(false);
-        return resp;
+        TSStatus noQueryExecutionStatus = new 
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+        noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+        return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
       }
+
+      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+
+      queryExecution.updateCurrentRpcStartTime(startTime);
       statementType = queryExecution.getStatementType();
 
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
@@ -1686,16 +1693,16 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
 
-      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
-
       queryExecution = COORDINATOR.getQueryExecution(req.queryId);
       if (queryExecution == null) {
-        resp.setHasResultSet(false);
-        resp.setMoreData(true);
-        return resp;
+        TSStatus noQueryExecutionStatus = new 
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+        noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+        return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
       }
+      queryExecution.updateCurrentRpcStartTime(startTime);
       statementType = queryExecution.getStatementType();
 
+      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
         Pair<TSQueryDataSet, Boolean> pair =
             convertTsBlockByFetchSize(queryExecution, req.fetchSize);
@@ -1705,7 +1712,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         resp.setHasResultSet(hasResultSet);
         resp.setQueryDataSet(result);
         resp.setIsAlign(true);
-        resp.setMoreData(finished);
+        resp.setMoreData(!finished);
         return resp;
       }
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index fdd50c447f7..7479e832a90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.common;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
@@ -51,7 +52,11 @@ public class MPPQueryContext {
   private long localQueryId;
   private SessionInfo session;
   private QueryType queryType = QueryType.READ;
+
+  /** the max executing time of query in ms. Unit: millisecond */
   private long timeOut;
+
+  // time unit is ms
   private long startTime;
 
   private TEndPoint localDataBlockEndpoint;
@@ -95,6 +100,7 @@ public class MPPQueryContext {
 
   private boolean userQuery = false;
 
+  @TestOnly
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
     this.endPointBlackList = ConcurrentHashMap.newKeySet();
@@ -102,19 +108,14 @@ public class MPPQueryContext {
         new NotThreadSafeMemoryReservationManager(queryId, 
this.getClass().getName());
   }
 
-  // TODO too many callers just pass a null SessionInfo which should be 
forbidden
+  @TestOnly
   public MPPQueryContext(
       String sql,
       QueryId queryId,
       SessionInfo session,
       TEndPoint localDataBlockEndpoint,
       TEndPoint localInternalEndpoint) {
-    this(queryId);
-    this.sql = sql;
-    this.session = session;
-    this.localDataBlockEndpoint = localDataBlockEndpoint;
-    this.localInternalEndpoint = localInternalEndpoint;
-    this.initResultNodeContext();
+    this(sql, queryId, -1, session, localDataBlockEndpoint, 
localInternalEndpoint);
   }
 
   public MPPQueryContext(
@@ -182,10 +183,12 @@ public class MPPQueryContext {
     return queryType;
   }
 
+  /** the max executing time of query in ms. Unit: millisecond */
   public long getTimeOut() {
     return timeOut;
   }
 
+  /** the max executing time of query in ms. Unit: millisecond */
   public void setTimeOut(long timeOut) {
     this.timeOut = timeOut;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index 146359bc215..19d0fb38749 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -107,10 +107,10 @@ public class QueryStateMachine {
     transitionToDoneState(CANCELED);
   }
 
-  public void transitionToCanceled(Throwable throwable, TSStatus 
failureStatus) {
+  public boolean transitionToCanceled(Throwable throwable, TSStatus 
failureStatus) {
     this.failureStatus.compareAndSet(null, failureStatus);
     this.failureException.compareAndSet(null, throwable);
-    transitionToDoneState(CANCELED);
+    return transitionToDoneState(CANCELED);
   }
 
   public void transitionToAborted() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 9a2657101e7..e8d0fd82432 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.execution.schedule.DriverScheduler;
 import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -427,6 +428,7 @@ public class FragmentInstanceManager {
                             + "ms, and now is in flushing state"));
           }
         });
+    Coordinator.getInstance().cleanUpStaleQueries();
   }
 
   public ExecutorService getIntoOperationExecutor() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index ed76b837e80..8afdfce7d5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -241,7 +242,6 @@ public class Coordinator {
     return queryExecutionMap.size();
   }
 
-  // TODO: (xingtanzjr) need to redo once we have a concrete policy for the 
threadPool management
   private ExecutorService getQueryExecutor() {
     int coordinatorReadExecutorSize = CONFIG.getCoordinatorReadExecutorSize();
     return IoTDBThreadPoolFactory.newFixedThreadPool(
@@ -254,7 +254,6 @@ public class Coordinator {
         coordinatorWriteExecutorSize, 
ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName());
   }
 
-  // TODO: (xingtanzjr) need to redo once we have a concrete policy for the 
threadPool management
   private ScheduledExecutorService getScheduledExecutor() {
     return IoTDBThreadPoolFactory.newScheduledThreadPool(
         COORDINATOR_SCHEDULED_EXECUTOR_SIZE,
@@ -267,12 +266,11 @@ public class Coordinator {
 
   public void cleanupQueryExecution(
       Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable 
t) {
-    IQueryExecution queryExecution = getQueryExecution(queryId);
+    IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
     if (queryExecution != null) {
       try (SetThreadName threadName = new 
SetThreadName(queryExecution.getQueryId())) {
         LOGGER.debug("[CleanUpQuery]]");
         queryExecution.stopAndCleanup(t);
-        queryExecutionMap.remove(queryId);
         if (queryExecution.isQuery() && queryExecution.isUserQuery()) {
           long costTime = queryExecution.getTotalExecutionTime();
           // print slow query
@@ -300,6 +298,35 @@ public class Coordinator {
     }
   }
 
+  /**
+   * We need to reclaim resources from queries that have exceeded their 
timeout by more than one
+   * minute. This indicates that the associated clients have failed to perform 
proper resource
+   * cleanup.
+   */
+  public void cleanUpStaleQueries() {
+    long currentTime = System.currentTimeMillis();
+    queryExecutionMap.forEach(
+        (queryId, queryExecution) -> {
+          if (queryExecution.isActive()) {
+            return;
+          }
+          long timeout = queryExecution.getTimeout();
+          long queryStartTime = queryExecution.getStartExecutionTime();
+          long executeTime = currentTime - queryStartTime;
+          if (timeout > 0 && executeTime - 60_000L > timeout) {
+            LOGGER.warn(
+                "Cleaning up stale query with id {}, which has been running 
for {} ms, timeout duration is: {}ms",
+                queryId,
+                executeTime,
+                timeout);
+            cleanupQueryExecution(
+                queryId,
+                null,
+                new QueryTimeoutRuntimeException(queryStartTime, currentTime, 
timeout));
+          }
+        });
+  }
+
   public void cleanupQueryExecution(Long queryId) {
     cleanupQueryExecution(queryId, null, null);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index bcea21a666e..7a30a6fa50f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement;
@@ -250,29 +251,37 @@ class ClusterSchemaFetchExecutor {
             String.format("Fetch Schema failed, because %s", 
executionResult.status.getMessage()),
             executionResult.status.getCode());
       }
+      IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
       try (SetThreadName threadName = new 
SetThreadName(executionResult.queryId.getId())) {
         ClusterSchemaTree result = new ClusterSchemaTree();
         ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer =
             new ClusterSchemaTree.SchemaNodeBatchDeserializer();
         Set<String> databaseSet = new HashSet<>();
-        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
-          // The query will be transited to FINISHED when invoking 
getBatchResult() at the last time
-          // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock;
-          try {
-            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-          } catch (IoTDBException e) {
-            t = e;
-            throw new RuntimeException("Fetch Schema failed. ", e);
-          }
-          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
-            break;
-          }
-          Column column = tsBlock.get().getColumn(0);
-          for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, deserializer, 
databaseSet, context);
+        if (queryExecution != null) {
+          while (queryExecution.hasNextResult()) {
+            // The query will be transited to FINISHED when invoking 
getBatchResult() at the last
+            // time
+            // So we don't need to clean up it manually
+            Optional<TsBlock> tsBlock;
+            try {
+              tsBlock = queryExecution.getBatchResult();
+            } catch (IoTDBException e) {
+              t = e;
+              throw new RuntimeException("Fetch Schema failed. ", e);
+            }
+            if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+              break;
+            }
+            Column column = tsBlock.get().getColumn(0);
+            for (int i = 0; i < column.getPositionCount(); i++) {
+              parseFetchedData(column.getBinary(i), result, deserializer, 
databaseSet, context);
+            }
           }
+        } else {
+          throw new RuntimeException(
+              String.format("Fetch Schema failed, because queryExecution is 
null for %s", queryId));
         }
+
         result.setDatabases(databaseSet);
         return result;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index 61824aa4f02..09f25a781e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -33,8 +33,6 @@ public interface IQueryExecution {
 
   void stop(Throwable t);
 
-  void stopAndCleanup();
-
   void stopAndCleanup(Throwable t);
 
   void cancel();
@@ -57,10 +55,32 @@ public interface IQueryExecution {
 
   String getQueryId();
 
+  // time unit is ms
   long getStartExecutionTime();
 
+  /**
+   * @param executionTime time unit should be ns
+   */
   void recordExecutionTime(long executionTime);
 
+  /**
+   * update current rpc start time, which is used to calculate rpc execution 
time and update total
+   * execution time
+   *
+   * @param startTime start time of current rpc, time unit is ns
+   */
+  void updateCurrentRpcStartTime(long startTime);
+
+  /**
+   * Check if there is an active RPC for this query. If {@code 
startTimeOfCurrentRpc == -1}, it
+   * means there is no active RPC, otherwise there is an active RPC. An active 
RPC means that the
+   * client is still fetching results and the QueryExecution should not be 
cleaned up until the RPC
+   * finishes. On the other hand, if there is no active RPC, it means that the 
client has finished
+   * fetching results or has not started fetching results yet, and the 
QueryExecution can be safely
+   * cleaned up.
+   */
+  boolean isActive();
+
   /**
    * @return cost time in ns
    */
@@ -69,6 +89,7 @@ public interface IQueryExecution {
   /** return ip for a thrift-based client, client-id for MQTT/REST client */
   String getClientHostname();
 
+  /** the max executing time of query in ms. Unit: millisecond */
   long getTimeout();
 
   Optional<String> getExecuteSQL();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 43f46133053..8de6165525e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import 
org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle;
@@ -104,9 +105,14 @@ public class QueryExecution implements IQueryExecution {
 
   private final AtomicBoolean stopped;
 
-  // cost time in ns
+  // cost time in ns of finished rpc
   private long totalExecutionTime = 0;
 
+  // -1 if previous rpc is finished and next client req hasn't come yet, unit 
is ns
+  // it will be updated in fetchResult rpc
+  // protected by synchronized(this)
+  private volatile long startTimeOfCurrentRpc = System.nanoTime();
+
   private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
       QueryExecutionMetricSet.getInstance();
   private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET =
@@ -129,16 +135,17 @@ public class QueryExecution implements IQueryExecution {
             if (!state.isDone()) {
               return;
             }
-            // TODO: (xingtanzjr) If the query is in abnormal state, the 
releaseResource() should be
-            // invoked
+            Throwable cause = null;
             if (state == QueryState.FAILED
                 || state == QueryState.ABORTED
                 || state == QueryState.CANCELED) {
               LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
-              Throwable cause = stateMachine.getFailureException();
+              cause = stateMachine.getFailureException();
               releaseResource(cause);
+
+              this.cleanUpCoordinatorContextMapIfNeeded(cause);
             }
-            this.stop(null);
+            this.stop(cause);
           }
         });
     this.stopped = new AtomicBoolean(false);
@@ -324,32 +331,20 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
-  // Stop the query and clean up all the resources this query occupied
-  public void stopAndCleanup() {
-    stop(null);
-    releaseResource();
-  }
-
   @Override
   public void cancel() {
-    stateMachine.transitionToCanceled(
-        new KilledByOthersException(),
-        new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
-            .setMessage(KilledByOthersException.MESSAGE));
-  }
-
-  /** Release the resources that current QueryExecution hold. */
-  private void releaseResource() {
-    // close ResultHandle to unblock client's getResult request
-    // Actually, we should not close the ResultHandle when the QueryExecution 
is Finished.
-    // There are only two scenarios where the ResultHandle should be closed:
-    //   1. The client fetch all the result and the ResultHandle is finished.
-    //   2. The client's connection is closed that all owned QueryExecution 
should be cleaned up
-    // If the QueryExecution's state is abnormal, we should also abort the 
resultHandle without
-    // waiting it to be finished.
-    if (resultHandle != null) {
-      resultHandle.close();
-      cleanUpResultHandle();
+    Throwable cause = new KilledByOthersException();
+    boolean cancelled =
+        stateMachine.transitionToCanceled(
+            cause,
+            new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
+                .setMessage(KilledByOthersException.MESSAGE));
+    if (!cancelled) {
+      // cancel failed, means this query has already in a done state, we can 
do nothing to change
+      // the state but clean up the resource if needed
+      // we don't need to do cleanUpCoordinatorContextMapIfNeeded if cancel 
succeed, because it will
+      // be called in callback logic in QueryStateMachine of this 
QueryExecution
+      this.cleanUpCoordinatorContextMapIfNeeded(cause);
     }
   }
 
@@ -358,7 +353,7 @@ public class QueryExecution implements IQueryExecution {
     // We don't need to deal with MemorySourceHandle because it doesn't 
register to memory pool
     // We don't need to deal with LocalSourceHandle because the 
SharedTsBlockQueue uses the upstream
     // FragmentInstanceId to register
-    if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle 
instanceof SourceHandle) {
+    if (resultHandle instanceof SourceHandle) {
       TFragmentInstanceId fragmentInstanceId = 
resultHandle.getLocalFragmentInstanceId();
       MPPDataExchangeService.getInstance()
           .getMPPDataExchangeManager()
@@ -385,7 +380,7 @@ public class QueryExecution implements IQueryExecution {
     //   2. The client's connection is closed that all owned QueryExecution 
should be cleaned up
     // If the QueryExecution's state is abnormal, we should also abort the 
resultHandle without
     // waiting it to be finished.
-    if (resultHandle != null) {
+    if (resultHandle != null && resultHandleCleanUp.compareAndSet(false, 
true)) {
       if (t != null) {
         resultHandle.abort(t);
       } else {
@@ -395,6 +390,30 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
+  /**
+   * Clear up Coordinator.queryExecutionMap by calling 
Coordinator.cleanupQueryExecution if there is
+   * no RPC in progress for this query (that is, the current RPC has finished 
and {@code
+   * startTimeOfCurrentRpc == -1}). In cases where an RPC is still active, the 
finally block in
+   * ClientRPCServiceImpl is responsible for performing the cleanup.
+   */
+  private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) {
+    if (isActive()) {
+      
Coordinator.getInstance().cleanupQueryExecution(context.getLocalQueryId(), 
null, t);
+    }
+  }
+
+  /**
+   * Check if there is an active RPC for this query. If {@code 
startTimeOfCurrentRpc == -1}, it
+   * means there is no active RPC, otherwise there is an active RPC. An active 
RPC means that the
+   * client is still fetching results and the QueryExecution should not be 
cleaned up until the RPC
+   * finishes. On the other hand, if there is no active RPC, it means that the 
client has finished
+   * fetching results or has not started fetching results yet, and the 
QueryExecution can be safely
+   * cleaned up.
+   */
+  public synchronized boolean isActive() {
+    return startTimeOfCurrentRpc == -1;
+  }
+
   /**
    * This method will be called by the request thread from client connection. 
This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. 
There is no more result
@@ -648,13 +667,22 @@ public class QueryExecution implements IQueryExecution {
   }
 
   @Override
-  public void recordExecutionTime(long executionTime) {
+  public synchronized void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
+    // recordExecutionTime is called after current rpc finished, so we need to 
set
+    // startTimeOfCurrentRpc to -1
+    this.startTimeOfCurrentRpc = -1;
+  }
+
+  @Override
+  public synchronized void updateCurrentRpcStartTime(long startTime) {
+    this.startTimeOfCurrentRpc = startTime;
   }
 
   @Override
-  public long getTotalExecutionTime() {
-    return totalExecutionTime;
+  public synchronized long getTotalExecutionTime() {
+    return totalExecutionTime
+        + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - 
startTimeOfCurrentRpc);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index a3e16a991ec..e3cf2408f06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -72,6 +72,12 @@ public class ConfigExecution implements IQueryExecution {
   private final StatementType statementType;
   private long totalExecutionTime;
 
+  // -1 if previous rpc is finished and next client req hasn't come yet, unit 
is ns
+  // it will be updated in fetchResult rpc
+  // currently, ConfigExecution will return result is just one call, so this 
field is not used. But
+  // we will keep it for future use when ConfigExecution may return result in 
multiple calls
+  private volatile long startTimeOfCurrentRpc = System.nanoTime();
+
   public ConfigExecution(
       MPPQueryContext context,
       StatementType statementType,
@@ -144,11 +150,6 @@ public class ConfigExecution implements IQueryExecution {
     // do nothing
   }
 
-  @Override
-  public void stopAndCleanup() {
-    // do nothing
-  }
-
   @Override
   public void stopAndCleanup(Throwable t) {
     // do nothing
@@ -244,11 +245,25 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
+    // recordExecutionTime is called after current rpc finished, so we need to 
set
+    // startTimeOfCurrentRpc to -1
+    this.startTimeOfCurrentRpc = -1;
+  }
+
+  @Override
+  public void updateCurrentRpcStartTime(long startTime) {
+    this.startTimeOfCurrentRpc = startTime;
+  }
+
+  @Override
+  public boolean isActive() {
+    return startTimeOfCurrentRpc == -1;
   }
 
   @Override
   public long getTotalExecutionTime() {
-    return totalExecutionTime;
+    return totalExecutionTime
+        + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - 
startTimeOfCurrentRpc);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index 7f3b5cd8d2d..76ec370ad98 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -1798,6 +1798,14 @@ public class MergeSortOperatorTest {
     @Override
     public void recordExecutionTime(long executionTime) {}
 
+    @Override
+    public void updateCurrentRpcStartTime(long startTime) {}
+
+    @Override
+    public boolean isActive() {
+      return true;
+    }
+
     @Override
     public long getTotalExecutionTime() {
       return 0;
@@ -1819,9 +1827,6 @@ public class MergeSortOperatorTest {
     @Override
     public void stop(Throwable t) {}
 
-    @Override
-    public void stopAndCleanup() {}
-
     @Override
     public void stopAndCleanup(Throwable t) {}
 

Reply via email to