This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/kill
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0a13d70c743395ab4a8143226d54ddde3b649c68
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Mar 25 17:15:02 2026 +0800

    Fix kill query doesn't take effect  bug
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 25 +++++---
 .../db/queryengine/common/MPPQueryContext.java     | 11 ++--
 .../iotdb/db/queryengine/plan/Coordinator.java     |  3 +-
 .../plan/execution/IQueryExecution.java            | 10 +++-
 .../queryengine/plan/execution/QueryExecution.java | 69 ++++++++++++----------
 .../plan/execution/config/ConfigExecution.java     | 22 +++++--
 .../execution/operator/MergeSortOperatorTest.java  |  6 +-
 7 files changed, 86 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index b5edafec160..d9913bf42c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -205,6 +205,7 @@ import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 import static 
org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
+import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED;
 
 public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
@@ -241,6 +242,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
 
+  private static final String NO_QUERY_EXECUTION_ERR_MSG =
+      "Query is not found, it may be killed by others, timeout or some other 
runtime errors, you can see more details in server log.";
+
   @FunctionalInterface
   public interface SelectResult {
     boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, 
int fetchSize)
@@ -1147,15 +1151,18 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         finished = true;
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
-      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
       queryExecution = COORDINATOR.getQueryExecution(req.queryId);
 
       if (queryExecution == null) {
-        resp.setHasResultSet(false);
-        resp.setMoreData(false);
-        return resp;
+        TSStatus noQueryExecutionStatus = new 
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+        noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+        return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
       }
+
+      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+
+      queryExecution.updateCurrentRpcStartTime(startTime);
       statementType = queryExecution.getStatementType();
 
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
@@ -1686,16 +1693,16 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
       }
 
-      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
-
       queryExecution = COORDINATOR.getQueryExecution(req.queryId);
       if (queryExecution == null) {
-        resp.setHasResultSet(false);
-        resp.setMoreData(true);
-        return resp;
+        TSStatus noQueryExecutionStatus = new 
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+        noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+        return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
       }
+      queryExecution.updateCurrentRpcStartTime(startTime);
       statementType = queryExecution.getStatementType();
 
+      TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
         Pair<TSQueryDataSet, Boolean> pair =
             convertTsBlockByFetchSize(queryExecution, req.fetchSize);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index fc457ab9ce4..d99ab5edd81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.common;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
@@ -95,6 +96,7 @@ public class MPPQueryContext {
 
   private boolean userQuery = false;
 
+  @TestOnly
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
     this.endPointBlackList = ConcurrentHashMap.newKeySet();
@@ -102,19 +104,14 @@ public class MPPQueryContext {
         new NotThreadSafeMemoryReservationManager(queryId, 
this.getClass().getName());
   }
 
-  // TODO too many callers just pass a null SessionInfo which should be 
forbidden
+  @TestOnly
   public MPPQueryContext(
       String sql,
       QueryId queryId,
       SessionInfo session,
       TEndPoint localDataBlockEndpoint,
       TEndPoint localInternalEndpoint) {
-    this(queryId);
-    this.sql = sql;
-    this.session = session;
-    this.localDataBlockEndpoint = localDataBlockEndpoint;
-    this.localInternalEndpoint = localInternalEndpoint;
-    this.initResultNodeContext();
+    this(sql, queryId, -1, session, localDataBlockEndpoint, 
localInternalEndpoint);
   }
 
   public MPPQueryContext(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index ed76b837e80..d9e9e4477b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -267,12 +267,11 @@ public class Coordinator {
 
   public void cleanupQueryExecution(
       Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable 
t) {
-    IQueryExecution queryExecution = getQueryExecution(queryId);
+    IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
     if (queryExecution != null) {
       try (SetThreadName threadName = new 
SetThreadName(queryExecution.getQueryId())) {
         LOGGER.debug("[CleanUpQuery]]");
         queryExecution.stopAndCleanup(t);
-        queryExecutionMap.remove(queryId);
         if (queryExecution.isQuery() && queryExecution.isUserQuery()) {
           long costTime = queryExecution.getTotalExecutionTime();
           // print slow query
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index 5cb2d4b449c..6726a37d59f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -33,8 +33,6 @@ public interface IQueryExecution {
 
   void stop(Throwable t);
 
-  void stopAndCleanup();
-
   void stopAndCleanup(Throwable t);
 
   void cancel();
@@ -61,6 +59,14 @@ public interface IQueryExecution {
 
   void recordExecutionTime(long executionTime);
 
+  /**
+   * update current rpc start time, which is used to calculate rpc execution 
time and update total
+   * execution time
+   *
+   * @param startTime start time of current rpc, time unit is ns
+   */
+  void updateCurrentRpcStartTime(long startTime);
+
   /**
    * @return cost time in ns
    */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 0ae40345268..6572a39b5ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import 
org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle;
@@ -104,9 +105,14 @@ public class QueryExecution implements IQueryExecution {
 
   private final AtomicBoolean stopped;
 
-  // cost time in ns
+  // cost time in ns of finished rpc
   private long totalExecutionTime = 0;
 
+  // -1 if previous rpc is finished and next client req hasn't come yet, unit 
is ns
+  // it will be updated in fetchResult rpc
+  // protected by synchronized(this)
+  private long startTimeOfCurrentRpc = System.nanoTime();
+
   private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
       QueryExecutionMetricSet.getInstance();
   private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET =
@@ -129,16 +135,16 @@ public class QueryExecution implements IQueryExecution {
             if (!state.isDone()) {
               return;
             }
-            // TODO: (xingtanzjr) If the query is in abnormal state, the 
releaseResource() should be
-            // invoked
+            Throwable cause = null;
             if (state == QueryState.FAILED
                 || state == QueryState.ABORTED
                 || state == QueryState.CANCELED) {
               LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
-              Throwable cause = stateMachine.getFailureException();
+              cause = stateMachine.getFailureException();
               releaseResource(cause);
             }
-            this.stop(null);
+            this.stop(cause);
+            this.cleanUpCoordinatorContextMapIfNeeded(cause);
           }
         });
     this.stopped = new AtomicBoolean(false);
@@ -324,12 +330,6 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
-  // Stop the query and clean up all the resources this query occupied
-  public void stopAndCleanup() {
-    stop(null);
-    releaseResource();
-  }
-
   @Override
   public void cancel() {
     stateMachine.transitionToCanceled(
@@ -338,27 +338,12 @@ public class QueryExecution implements IQueryExecution {
             .setMessage(KilledByOthersException.MESSAGE));
   }
 
-  /** Release the resources that current QueryExecution hold. */
-  private void releaseResource() {
-    // close ResultHandle to unblock client's getResult request
-    // Actually, we should not close the ResultHandle when the QueryExecution 
is Finished.
-    // There are only two scenarios where the ResultHandle should be closed:
-    //   1. The client fetch all the result and the ResultHandle is finished.
-    //   2. The client's connection is closed that all owned QueryExecution 
should be cleaned up
-    // If the QueryExecution's state is abnormal, we should also abort the 
resultHandle without
-    // waiting it to be finished.
-    if (resultHandle != null) {
-      resultHandle.close();
-      cleanUpResultHandle();
-    }
-  }
-
   private void cleanUpResultHandle() {
     // Result handle belongs to special fragment instance, so we need to 
deregister it alone
     // We don't need to deal with MemorySourceHandle because it doesn't 
register to memory pool
     // We don't need to deal with LocalSourceHandle because the 
SharedTsBlockQueue uses the upstream
     // FragmentInstanceId to register
-    if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle 
instanceof SourceHandle) {
+    if (resultHandle instanceof SourceHandle) {
       TFragmentInstanceId fragmentInstanceId = 
resultHandle.getLocalFragmentInstanceId();
       MPPDataExchangeService.getInstance()
           .getMPPDataExchangeManager()
@@ -385,13 +370,26 @@ public class QueryExecution implements IQueryExecution {
     //   2. The client's connection is closed that all owned QueryExecution 
should be cleaned up
     // If the QueryExecution's state is abnormal, we should also abort the 
resultHandle without
     // waiting it to be finished.
-    if (resultHandle != null) {
+    if (resultHandle != null && resultHandleCleanUp.compareAndSet(false, 
true)) {
       if (t != null) {
         resultHandle.abort(t);
       } else {
         resultHandle.close();
       }
       cleanUpResultHandle();
+      resultHandle = null;
+    }
+  }
+
+  /**
+   * clear up Coordinator.queryExecutionMap by calling 
Coordinator.cleanupQueryExecution if the
+   * current rpc is finished. We need to make sure the cleanup logic is only 
called when client
+   * connection is not active, otherwise the finally code logic in 
ClientRPCServiceImpl will handle
+   * that
+   */
+  private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) {
+    if (startTimeOfCurrentRpc == -1) {
+      
Coordinator.getInstance().cleanupQueryExecution(context.getLocalQueryId(), 
null, t);
     }
   }
 
@@ -648,13 +646,22 @@ public class QueryExecution implements IQueryExecution {
   }
 
   @Override
-  public void recordExecutionTime(long executionTime) {
+  public synchronized void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
+    // recordExecutionTime is called after current rpc finished, so we need to 
set
+    // startTimeOfCurrentRpc to -1
+    this.startTimeOfCurrentRpc = -1;
+  }
+
+  @Override
+  public synchronized void updateCurrentRpcStartTime(long startTime) {
+    this.startTimeOfCurrentRpc = startTime;
   }
 
   @Override
-  public long getTotalExecutionTime() {
-    return totalExecutionTime;
+  public synchronized long getTotalExecutionTime() {
+    return totalExecutionTime
+        + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - 
startTimeOfCurrentRpc);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index a9d96961834..00478d0ab88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -72,6 +72,12 @@ public class ConfigExecution implements IQueryExecution {
   private final StatementType statementType;
   private long totalExecutionTime;
 
+  // -1 if previous rpc is finished and next client req hasn't come yet, unit 
is ns
+  // it will be updated in fetchResult rpc
+  // currently, ConfigExecution will return result is just one call, so this 
field is not used. But
+  // we will keep it for future use when ConfigExecution may return result in 
multiple calls
+  private long startTimeOfCurrentRpc = System.nanoTime();
+
   public ConfigExecution(
       MPPQueryContext context,
       StatementType statementType,
@@ -144,11 +150,6 @@ public class ConfigExecution implements IQueryExecution {
     // do nothing
   }
 
-  @Override
-  public void stopAndCleanup() {
-    // do nothing
-  }
-
   @Override
   public void stopAndCleanup(Throwable t) {
     // do nothing
@@ -244,11 +245,20 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
+    // recordExecutionTime is called after current rpc finished, so we need to 
set
+    // startTimeOfCurrentRpc to -1
+    this.startTimeOfCurrentRpc = -1;
+  }
+
+  @Override
+  public void updateCurrentRpcStartTime(long startTime) {
+    this.startTimeOfCurrentRpc = startTime;
   }
 
   @Override
   public long getTotalExecutionTime() {
-    return totalExecutionTime;
+    return totalExecutionTime
+        + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - 
startTimeOfCurrentRpc);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index 604d3eb3442..cf6f5a1ea75 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -1798,6 +1798,9 @@ public class MergeSortOperatorTest {
     @Override
     public void recordExecutionTime(long executionTime) {}
 
+    @Override
+    public void updateCurrentRpcStartTime(long startTime) {}
+
     @Override
     public long getTotalExecutionTime() {
       return 0;
@@ -1819,9 +1822,6 @@ public class MergeSortOperatorTest {
     @Override
     public void stop(Throwable t) {}
 
-    @Override
-    public void stopAndCleanup() {}
-
     @Override
     public void stopAndCleanup(Throwable t) {}
 

Reply via email to