This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7128b9fc4c63856629831f26a03aad1d02272aa4 Author: Jackie Tien <[email protected]> AuthorDate: Mon Jun 1 10:39:38 2026 +0800 Fix repeated RPC dispatch reusing a released FragmentInstanceContext (NPE) (#17794) --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../execution/executor/RegionReadExecutor.java | 23 ++++++- .../fragment/FragmentInstanceManager.java | 79 ++++++++++++++++------ .../scheduler/FragmentInstanceDispatcherImpl.java | 15 ++++ .../apache/iotdb/db/utils/ErrorHandlingUtils.java | 2 +- .../execution/executor/RegionReadExecutorTest.java | 46 +++++++++++++ 6 files changed, 141 insertions(+), 25 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index e5014681fa7..035c648132f 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -146,6 +146,7 @@ public enum TSStatusCode { QUERY_TIMEOUT(720), PLAN_FAILED_NETWORK_PARTITION(721), CANNOT_FETCH_FI_STATE(722), + REPEATED_RPC_CALL(723), // OBJECT OBJECT_NOT_EXISTS(740), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java index b393919cf77..07c1574e2aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; @@ -127,6 +128,13 @@ public class RegionReadExecutor { || t instanceof InterruptedException) { resp.setReadNeedRetry(true); resp.setStatus(new TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode())); + } else if (t instanceof IoTDBRuntimeException) { + // Carry the original status code (e.g. REPEATED_RPC_CALL) back to the dispatcher so it is + // not downgraded to EXECUTE_STATEMENT_ERROR; needRetryHelper decides retryability. + TSStatus status = new TSStatus(((IoTDBRuntimeException) t).getErrorCode()); + status.setMessage(t.getMessage()); + resp.setStatus(status); + resp.setReadNeedRetry(StatusUtils.needRetryHelper(status)); } return resp; } @@ -156,8 +164,19 @@ public class RegionReadExecutor { } } catch (Throwable t) { LOGGER.warn(DataNodeQueryMessages.EXECUTE_FRAGMENTINSTANCE_IN_QUERYEXECUTOR_FAILED, t); - return RegionExecutionResult.create( - false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null); + RegionExecutionResult resp = + RegionExecutionResult.create( + false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null); + Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(t); + if (rootCause instanceof IoTDBRuntimeException) { + // Carry the original status code (e.g. REPEATED_RPC_CALL) back to the dispatcher so it is + // not downgraded to EXECUTE_STATEMENT_ERROR; needRetryHelper decides retryability. + TSStatus status = new TSStatus(((IoTDBRuntimeException) rootCause).getErrorCode()); + status.setMessage(rootCause.getMessage()); + resp.setStatus(status); + resp.setReadNeedRetry(StatusUtils.needRetryHelper(status)); + } + return resp; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 37e5c6c0858..e5f3cbc7e0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -147,23 +147,30 @@ public class FragmentInstanceManager { FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); - int dataNodeFINum = instance.getDataNodeFINum(); - DataNodeQueryContext dataNodeQueryContext = - getOrCreateDataNodeQueryContext(instanceId.getQueryId(), dataNodeFINum); - + boolean[] contextCreated = new boolean[] {false}; + DataNodeQueryContext[] dataNodeQueryContexts = new DataNodeQueryContext[1]; FragmentInstanceContext context = instanceContext.computeIfAbsent( instanceId, - fragmentInstanceId -> - createFragmentInstanceContext( - fragmentInstanceId, - stateMachine, - instance.getSessionInfo(), - dataRegion, - instance.getGlobalTimePredicate(), - dataNodeQueryContextMap, - instance.isDebug(), - instance.isVerbose())); + fragmentInstanceId -> { + contextCreated[0] = true; + // Only ensure the DataNodeQueryContext when we actually create the + // FragmentInstanceContext, so the repeated-dispatch path (which rejects + // without creating a context) does not leak a context entry. + dataNodeQueryContexts[0] = + getOrCreateDataNodeQueryContext( + instanceId.getQueryId(), instance.getDataNodeFINum()); + return createFragmentInstanceContext( + fragmentInstanceId, + stateMachine, + instance.getSessionInfo(), + dataRegion, + instance.getGlobalTimePredicate(), + dataNodeQueryContextMap, + instance.isDebug(), + instance.isVerbose()); + }); + rejectIfRepeatedDispatch(contextCreated[0], instanceId); context.setHighestPriority(instance.isHighestPriority()); try { @@ -172,7 +179,7 @@ public class FragmentInstanceManager { instance.getFragment().getPlanNodeTree(), instance.getFragment().getTypeProvider(), context, - dataNodeQueryContext); + dataNodeQueryContexts[0]); List<IDriver> drivers = new ArrayList<>(); driverFactories.forEach(factory -> drivers.add(factory.createDriver())); @@ -246,6 +253,30 @@ public class FragmentInstanceManager { } } + /** + * If {@code instanceContext.computeIfAbsent} returned an existing {@link FragmentInstanceContext} + * for this {@code instanceId} (i.e. {@code contextCreated} is false), the same FragmentInstance + * has been dispatched before (e.g. an RPC retry in {@code + * FragmentInstanceDispatcherImpl#dispatchRemote}). The previous execution may have already + * released its resources (dataRegion == null), so reusing this cached context would run a fresh + * driver against a released context and trigger an NPE. Reject the duplicated dispatch with + * REPEATED_RPC_CALL instead of reusing it. + * + * <p>This must be called before the planning try block on purpose, so it propagates up + * (RegionReadExecutor carries the status code) without touching the first execution's cached + * resources. + */ + private static void rejectIfRepeatedDispatch( + boolean contextCreated, FragmentInstanceId instanceId) { + if (!contextCreated) { + throw new IoTDBRuntimeException( + String.format( + "Repeated RPC call detected for FragmentInstance %s, reject the duplicated dispatch.", + instanceId.getFullId()), + TSStatusCode.REPEATED_RPC_CALL.getStatusCode()); + } + } + private void clearFIRelatedResources(FragmentInstanceId instanceId) { // close and remove all the handles of the fragment instance exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift()); @@ -270,16 +301,20 @@ public class FragmentInstanceManager { FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + boolean[] contextCreated = new boolean[] {false}; FragmentInstanceContext context = instanceContext.computeIfAbsent( instanceId, - fragmentInstanceId -> - createFragmentInstanceContext( - fragmentInstanceId, - stateMachine, - instance.getSessionInfo(), - instance.isDebug(), - instance.isVerbose())); + fragmentInstanceId -> { + contextCreated[0] = true; + return createFragmentInstanceContext( + fragmentInstanceId, + stateMachine, + instance.getSessionInfo(), + instance.isDebug(), + instance.isVerbose()); + }); + rejectIfRepeatedDispatch(contextCreated[0], instanceId); context.setHighestPriority(instance.isHighestPriority()); try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index aa5b8350f87..4b3165a9ed5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -37,6 +37,7 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.RatisReadUnavailableException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; +import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult; @@ -589,6 +590,20 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { "can't execute request on node {}, error msg is {}, and we try to reconnect this node.", endPoint, ExceptionUtils.getRootCause(e).toString()); + // If the query has already timed out, do not retry. Re-dispatching the same FragmentInstance + // may cause it to be executed twice on the remote node (see the REPEATED_RPC_CALL handling in + // FragmentInstanceManager), so we fail fast with a timeout status instead. + long currentTime = System.currentTimeMillis(); + if (currentTime - queryContext.getStartTime() >= queryContext.getTimeOut()) { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.QUERY_TIMEOUT, + String.format( + QueryTimeoutRuntimeException.QUERY_TIMEOUT_EXCEPTION_MESSAGE, + queryContext.getStartTime(), + queryContext.getStartTime() + queryContext.getTimeOut(), + currentTime))); + } // we just retry once to clear stale connection for a restart node. try { dispatchRemoteHelper(instance, endPoint); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 533fa8ab640..103549a9cc7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -160,7 +160,7 @@ public class ErrorHandlingUtils { Throwable t = e instanceof ExecutionException ? e.getCause() : e; if (t instanceof QueryTimeoutRuntimeException) { - return RpcUtils.getStatus(TSStatusCode.INTERNAL_REQUEST_TIME_OUT, rootCause.getMessage()); + return RpcUtils.getStatus(TSStatusCode.QUERY_TIMEOUT, rootCause.getMessage()); } else if (t instanceof ParseCancellationException) { return RpcUtils.getStatus( TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + rootCause.getMessage()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java index 016fe90b460..b0b5fb805a3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion; +import org.apache.iotdb.rpc.TSStatusCode; import org.junit.Test; import org.mockito.Mockito; @@ -161,6 +163,50 @@ public class RegionReadExecutorTest { assertEquals(String.format(ERROR_MSG_FORMAT, "schema-exception"), res.getMessage()); } + @Test + public void testRepeatedRpcCall() throws ConsensusException { + // A repeated RPC dispatch of the same FragmentInstance makes FragmentInstanceManager throw + // IoTDBRuntimeException(REPEATED_RPC_CALL). RegionReadExecutor must carry that status code back + // (instead of dropping it, which would downgrade it to EXECUTE_STATEMENT_ERROR) and mark the + // result as non-retryable. + ConsensusGroupId dataRegionGroupId = new DataRegionId(1); + FragmentInstanceId fragmentInstanceId = + new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0"); + FragmentInstance fragmentInstance = Mockito.mock(FragmentInstance.class); + Mockito.when(fragmentInstance.getId()).thenReturn(fragmentInstanceId); + + IConsensus dataRegionConsensus = Mockito.mock(IConsensus.class); + IConsensus schemaRegionConsensus = Mockito.mock(IConsensus.class); + FragmentInstanceManager fragmentInstanceManager = Mockito.mock(FragmentInstanceManager.class); + + RegionReadExecutor executor = + new RegionReadExecutor(dataRegionConsensus, schemaRegionConsensus, fragmentInstanceManager); + + // consensus read path (covers both data and schema region queries) + Mockito.when(dataRegionConsensus.read(dataRegionGroupId, fragmentInstance)) + .thenThrow( + new IoTDBRuntimeException("repeated", TSStatusCode.REPEATED_RPC_CALL.getStatusCode())); + + RegionExecutionResult res = executor.execute(dataRegionGroupId, fragmentInstance); + + assertFalse(res.isAccepted()); + assertEquals(TSStatusCode.REPEATED_RPC_CALL.getStatusCode(), res.getStatus().getCode()); + assertFalse(res.isReadNeedRetry()); + + // VirtualDataRegion path (FI executed directly through FragmentInstanceManager) + Mockito.when( + fragmentInstanceManager.execDataQueryFragmentInstance( + fragmentInstance, VirtualDataRegion.getInstance())) + .thenThrow( + new IoTDBRuntimeException("repeated", TSStatusCode.REPEATED_RPC_CALL.getStatusCode())); + + res = executor.execute(fragmentInstance); + + assertFalse(res.isAccepted()); + assertEquals(TSStatusCode.REPEATED_RPC_CALL.getStatusCode(), res.getStatus().getCode()); + assertFalse(res.isReadNeedRetry()); + } + @Test public void testExceptionHappened() throws ConsensusException {
