This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/kill in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0a13d70c743395ab4a8143226d54ddde3b649c68 Author: JackieTien97 <[email protected]> AuthorDate: Wed Mar 25 17:15:02 2026 +0800 Fix kill query doesn't take effect bug --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 25 +++++--- .../db/queryengine/common/MPPQueryContext.java | 11 ++-- .../iotdb/db/queryengine/plan/Coordinator.java | 3 +- .../plan/execution/IQueryExecution.java | 10 +++- .../queryengine/plan/execution/QueryExecution.java | 69 ++++++++++++---------- .../plan/execution/config/ConfigExecution.java | 22 +++++-- .../execution/operator/MergeSortOperatorTest.java | 6 +- 7 files changed, 86 insertions(+), 60 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index b5edafec160..d9913bf42c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -205,6 +205,7 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; +import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED; public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @@ -241,6 +242,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + private static final String NO_QUERY_EXECUTION_ERR_MSG = + "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; + @FunctionalInterface public interface SelectResult { boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) @@ -1147,15 +1151,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(false); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); + + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { @@ -1686,16 +1693,16 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); } - TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); - queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { - resp.setHasResultSet(false); - resp.setMoreData(true); - return resp; + TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode()); + noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG); + return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus); } + queryExecution.updateCurrentRpcStartTime(startTime); statementType = queryExecution.getStatementType(); + TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, req.fetchSize); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index fc457ab9ce4..d99ab5edd81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.common; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; @@ -95,6 +96,7 @@ public class MPPQueryContext { private boolean userQuery = false; + @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = ConcurrentHashMap.newKeySet(); @@ -102,19 +104,14 @@ public class MPPQueryContext { new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName()); } - // TODO too many callers just pass a null SessionInfo which should be forbidden + @TestOnly public MPPQueryContext( String sql, QueryId queryId, SessionInfo session, TEndPoint localDataBlockEndpoint, TEndPoint localInternalEndpoint) { - this(queryId); - this.sql = sql; - this.session = session; - this.localDataBlockEndpoint = localDataBlockEndpoint; - this.localInternalEndpoint = localInternalEndpoint; - this.initResultNodeContext(); + this(sql, queryId, -1, session, localDataBlockEndpoint, localInternalEndpoint); } public MPPQueryContext( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index ed76b837e80..d9e9e4477b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -267,12 +267,11 @@ public class Coordinator { public void cleanupQueryExecution( Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable t) { - IQueryExecution queryExecution = getQueryExecution(queryId); + IQueryExecution queryExecution = queryExecutionMap.remove(queryId); if (queryExecution != null) { try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { LOGGER.debug("[CleanUpQuery]]"); queryExecution.stopAndCleanup(t); - queryExecutionMap.remove(queryId); if (queryExecution.isQuery() && queryExecution.isUserQuery()) { long costTime = queryExecution.getTotalExecutionTime(); // print slow query diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index 5cb2d4b449c..6726a37d59f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -33,8 +33,6 @@ public interface IQueryExecution { void stop(Throwable t); - void stopAndCleanup(); - void stopAndCleanup(Throwable t); void cancel(); @@ -61,6 +59,14 @@ public interface IQueryExecution { void recordExecutionTime(long executionTime); + /** + * update current rpc start time, which is used to calculate rpc execution time and update total + * execution time + * + * @param startTime start time of current rpc, time unit is ns + */ + void updateCurrentRpcStartTime(long startTime); + /** * @return cost time in ns */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 0ae40345268..6572a39b5ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle; import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle; @@ -104,9 +105,14 @@ public class QueryExecution implements IQueryExecution { private final AtomicBoolean stopped; - // cost time in ns + // cost time in ns of finished rpc private long totalExecutionTime = 0; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // protected by synchronized(this) + private long startTimeOfCurrentRpc = System.nanoTime(); + private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET = QueryExecutionMetricSet.getInstance(); private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET = @@ -129,16 +135,16 @@ public class QueryExecution implements IQueryExecution { if (!state.isDone()) { return; } - // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be - // invoked + Throwable cause = null; if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { LOGGER.debug("[ReleaseQueryResource] state is: {}", state); - Throwable cause = stateMachine.getFailureException(); + cause = stateMachine.getFailureException(); releaseResource(cause); } - this.stop(null); + this.stop(cause); + this.cleanUpCoordinatorContextMapIfNeeded(cause); } }); this.stopped = new AtomicBoolean(false); @@ -324,12 +330,6 @@ public class QueryExecution implements IQueryExecution { } } - // Stop the query and clean up all the resources this query occupied - public void stopAndCleanup() { - stop(null); - releaseResource(); - } - @Override public void cancel() { stateMachine.transitionToCanceled( @@ -338,27 +338,12 @@ public class QueryExecution implements IQueryExecution { .setMessage(KilledByOthersException.MESSAGE)); } - /** Release the resources that current QueryExecution hold. */ - private void releaseResource() { - // close ResultHandle to unblock client's getResult request - // Actually, we should not close the ResultHandle when the QueryExecution is Finished. - // There are only two scenarios where the ResultHandle should be closed: - // 1. The client fetch all the result and the ResultHandle is finished. - // 2. The client's connection is closed that all owned QueryExecution should be cleaned up - // If the QueryExecution's state is abnormal, we should also abort the resultHandle without - // waiting it to be finished. - if (resultHandle != null) { - resultHandle.close(); - cleanUpResultHandle(); - } - } - private void cleanUpResultHandle() { // Result handle belongs to special fragment instance, so we need to deregister it alone // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream // FragmentInstanceId to register - if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) { + if (resultHandle instanceof SourceHandle) { TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); MPPDataExchangeService.getInstance() .getMPPDataExchangeManager() @@ -385,13 +370,26 @@ public class QueryExecution implements IQueryExecution { // 2. The client's connection is closed that all owned QueryExecution should be cleaned up // If the QueryExecution's state is abnormal, we should also abort the resultHandle without // waiting it to be finished. - if (resultHandle != null) { + if (resultHandle != null && resultHandleCleanUp.compareAndSet(false, true)) { if (t != null) { resultHandle.abort(t); } else { resultHandle.close(); } cleanUpResultHandle(); + resultHandle = null; + } + } + + /** + * clear up Coordinator.queryExecutionMap by calling Coordinator.cleanupQueryExecution if the + * current rpc is finished. We need to make sure the cleanup logic is only called when client + * connection is not active, otherwise the finally code logic in ClientRPCServiceImpl will handle + * that + */ + private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) { + if (startTimeOfCurrentRpc == -1) { + Coordinator.getInstance().cleanupQueryExecution(context.getLocalQueryId(), null, t); } } @@ -648,13 +646,22 @@ public class QueryExecution implements IQueryExecution { } @Override - public void recordExecutionTime(long executionTime) { + public synchronized void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public synchronized void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; } @Override - public long getTotalExecutionTime() { - return totalExecutionTime; + public synchronized long getTotalExecutionTime() { + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index a9d96961834..00478d0ab88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -72,6 +72,12 @@ public class ConfigExecution implements IQueryExecution { private final StatementType statementType; private long totalExecutionTime; + // -1 if previous rpc is finished and next client req hasn't come yet, unit is ns + // it will be updated in fetchResult rpc + // currently, ConfigExecution will return result is just one call, so this field is not used. But + // we will keep it for future use when ConfigExecution may return result in multiple calls + private long startTimeOfCurrentRpc = System.nanoTime(); + public ConfigExecution( MPPQueryContext context, StatementType statementType, @@ -144,11 +150,6 @@ public class ConfigExecution implements IQueryExecution { // do nothing } - @Override - public void stopAndCleanup() { - // do nothing - } - @Override public void stopAndCleanup(Throwable t) { // do nothing @@ -244,11 +245,20 @@ public class ConfigExecution implements IQueryExecution { @Override public void recordExecutionTime(long executionTime) { totalExecutionTime += executionTime; + // recordExecutionTime is called after current rpc finished, so we need to set + // startTimeOfCurrentRpc to -1 + this.startTimeOfCurrentRpc = -1; + } + + @Override + public void updateCurrentRpcStartTime(long startTime) { + this.startTimeOfCurrentRpc = startTime; } @Override public long getTotalExecutionTime() { - return totalExecutionTime; + return totalExecutionTime + + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() - startTimeOfCurrentRpc); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java index 604d3eb3442..cf6f5a1ea75 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java @@ -1798,6 +1798,9 @@ public class MergeSortOperatorTest { @Override public void recordExecutionTime(long executionTime) {} + @Override + public void updateCurrentRpcStartTime(long startTime) {} + @Override public long getTotalExecutionTime() { return 0; @@ -1819,9 +1822,6 @@ public class MergeSortOperatorTest { @Override public void stop(Throwable t) {} - @Override - public void stopAndCleanup() {} - @Override public void stopAndCleanup(Throwable t) {}
