This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch check_consensus_before_answering_region_request in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1f5671e3cf72346b053050fc1558fc2d689803c2 Author: Tian Jiang <[email protected]> AuthorDate: Thu Apr 23 19:04:39 2026 +0800 Add ut --- .../impl/DataNodeInternalRPCServiceImpl.java | 14 ++- .../java/org/apache/iotdb/db/service/DataNode.java | 3 +- .../db/protocol/thrift/impl/ConsensusWaitTest.java | 131 +++++++++++++++++++++ 3 files changed, 143 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index bd420fab06a..5075212bf41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -442,21 +442,25 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface this.dataNodeContext = dataNodeContext; } + private long consensusWaitTimeoutSeconds = 30; + private TSStatus waitForConsensusStarted() { if (dataNodeContext.isAllConsensusStarted()) { return null; } try { Await.await() - .atMost(30, TimeUnit.SECONDS) + .atMost(consensusWaitTimeoutSeconds, TimeUnit.SECONDS) .pollInterval(100, TimeUnit.MILLISECONDS) .until(dataNodeContext::isAllConsensusStarted); return null; } catch (AwaitTimeoutException e) { - LOGGER.warn("Consensus has not been started after 30 seconds, rejecting region request"); + LOGGER.warn( + "Consensus has not been started after {} seconds, rejecting region request", + consensusWaitTimeoutSeconds); return RpcUtils.getStatus( TSStatusCode.CONSENSUS_NOT_INITIALIZED, - "Consensus has not been started after 30 seconds"); + "Consensus has not been started after " + consensusWaitTimeoutSeconds + " seconds"); } } @@ -3505,4 +3509,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface return result; } + + public void setConsensusWaitTimeoutSeconds(long consensusWaitTimeoutSeconds) { + this.consensusWaitTimeoutSeconds = consensusWaitTimeoutSeconds; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 568f2c266e2..00f113cc70f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -936,8 +936,7 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { protected void registerInternalRPCService() throws StartupException { // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling - DataNodeInternalRPCService instance = DataNodeInternalRPCService.getInstance(); - registerManager.register(); + registerManager.register(DataNodeInternalRPCService.getInstance()); } // make it easier for users to extend ClientRPCServiceImpl to export more rpc services diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/ConsensusWaitTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/ConsensusWaitTest.java new file mode 100644 index 00000000000..65fd2b47858 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/ConsensusWaitTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.protocol.thrift.impl; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.service.DataNode.DataNodeContext; +import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; +import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; +import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; +import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Collections; + +import static org.mockito.Mockito.when; + +public class ConsensusWaitTest { + + @BeforeClass + public static void setUp() { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0); + } + + private DataNodeInternalRPCServiceImpl createServiceWithConsensusState(boolean started) { + DataNodeContext context = Mockito.mock(DataNodeContext.class); + when(context.isAllConsensusStarted()).thenReturn(started); + DataNodeInternalRPCServiceImpl service = new DataNodeInternalRPCServiceImpl(context); + service.setConsensusWaitTimeoutSeconds(1); + return service; + } + + private TCreateSchemaRegionReq createSchemaRegionReq() { + TCreateSchemaRegionReq req = new TCreateSchemaRegionReq(); + req.setStorageGroup("root.test"); + TRegionReplicaSet replicaSet = new TRegionReplicaSet(); + replicaSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0)); + TDataNodeLocation location = new TDataNodeLocation(); + location.setDataNodeId(0); + location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667)); + location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730)); + location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740)); + location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760)); + location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750)); + replicaSet.setDataNodeLocations(Collections.singletonList(location)); + req.setRegionReplicaSet(replicaSet); + return req; + } + + private TCreateDataRegionReq createDataRegionReq() { + TCreateDataRegionReq req = new TCreateDataRegionReq(); + req.setStorageGroup("root.test"); + TRegionReplicaSet replicaSet = new TRegionReplicaSet(); + replicaSet.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0)); + TDataNodeLocation location = new TDataNodeLocation(); + location.setDataNodeId(0); + location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667)); + location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730)); + location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740)); + location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760)); + location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750)); + replicaSet.setDataNodeLocations(Collections.singletonList(location)); + req.setRegionReplicaSet(replicaSet); + return req; + } + + @Test + public void testCreateSchemaRegionRejectsWhenConsensusNotStarted() { + DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false); + TSStatus status = service.createSchemaRegion(createSchemaRegionReq()); + Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode()); + } + + @Test + public void testCreateDataRegionRejectsWhenConsensusNotStarted() { + DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false); + TSStatus status = service.createDataRegion(createDataRegionReq()); + Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode()); + } + + @Test + public void testDeleteRegionRejectsWhenConsensusNotStarted() { + DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false); + TConsensusGroupId groupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 0); + TSStatus status = service.deleteRegion(groupId); + Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), status.getCode()); + } + + @Test + public void testChangeRegionLeaderRejectsWhenConsensusNotStarted() { + DataNodeInternalRPCServiceImpl service = createServiceWithConsensusState(false); + TRegionLeaderChangeReq req = new TRegionLeaderChangeReq(); + req.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0)); + TDataNodeLocation newLeader = new TDataNodeLocation(); + newLeader.setDataNodeId(0); + newLeader.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730)); + newLeader.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760)); + newLeader.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750)); + req.setNewLeaderNode(newLeader); + TRegionLeaderChangeResp resp = service.changeRegionLeader(req); + Assert.assertEquals( + TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(), resp.getStatus().getCode()); + } +}
