This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Fix-ConfigNode-restart-bug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fdbfc5402a864cdcfcf0765691da5a6bbb2e5f1f Author: YongzaoDan <[email protected]> AuthorDate: Thu Aug 17 14:55:59 2023 +0800 Finish --- .../it/cluster/IoTDBClusterRestartIT.java | 105 ++++++++++++++++++++- .../statemachine/ConfigRegionStateMachine.java | 5 + .../manager/consensus/ConsensusManager.java | 9 +- 3 files changed, 116 insertions(+), 3 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index 80c0da1c387..1bcfe5ac0a1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -19,11 +19,21 @@ package org.apache.iotdb.confignode.it.cluster; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.EnvUtils; @@ -34,17 +44,21 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; @RunWith(IoTDBTestRunner.class) @@ -57,6 +71,9 @@ public class IoTDBClusterRestartIT { private static final int testDataNodeNum = 2; private static final int testReplicationFactor = 2; + private static final String database = "root.db"; + private static final long testTimePartitionInterval = 604800000; + @Before public void setUp() throws Exception { EnvFactory.getEnv() @@ -67,7 +84,8 @@ public class IoTDBClusterRestartIT { .setDataRegionConsensusProtocolClass(ratisConsensusProtocolClass) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaReplicationFactor(testReplicationFactor) - .setDataReplicationFactor(testReplicationFactor); + .setDataReplicationFactor(testReplicationFactor) + .setTimePartitionInterval(testTimePartitionInterval); // Init 2C2D cluster environment EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum); @@ -79,7 +97,52 @@ public class IoTDBClusterRestartIT { } @Test - public void clusterRestartTest() throws InterruptedException { + public void clusterRestartTest() + throws InterruptedException, ClientManagerException, IOException, TException, + IllegalPathException { + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + // Insert some data into cluster + + TSStatus status = client.setDatabase(new TDatabaseSchema(database)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + final String d0 = database + ".d0.s"; + TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq(); + TSchemaPartitionTableResp schemaPartitionTableResp; + Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable; + ByteBuffer buffer = ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d0}); + schemaPartitionReq.setPathPatternTree(buffer); + schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + schemaPartitionTableResp.getStatus().getCode()); + Assert.assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize()); + schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable(); + Assert.assertEquals(1, schemaPartitionTable.get(database).size()); + + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + database, 0, 1, 0, 1, testTimePartitionInterval); + TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); + TDataPartitionTableResp dataPartitionTableResp = + client.getOrCreateDataPartitionTable(dataPartitionReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); + ConfigNodeTestUtils.checkDataPartitionTable( + database, + 0, + 1, + 0, + 1, + testTimePartitionInterval, + dataPartitionTableResp.getDataPartitionTable()); + } + // Shutdown all cluster nodes for (int i = 0; i < testConfigNodeNum; i++) { EnvFactory.getEnv().shutdownConfigNode(i); @@ -100,6 +163,44 @@ public class IoTDBClusterRestartIT { } ((AbstractEnv) EnvFactory.getEnv()).testWorking(); + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + // The cluster is still writable after restart + final String d1 = database + ".d1.s"; + TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq(); + TSchemaPartitionTableResp schemaPartitionTableResp; + Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable; + ByteBuffer buffer = ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d1}); + schemaPartitionReq.setPathPatternTree(buffer); + schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + schemaPartitionTableResp.getStatus().getCode()); + Assert.assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize()); + schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable(); + Assert.assertEquals(1, schemaPartitionTable.get(database).size()); + + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + database, 1, 2, 1, 2, testTimePartitionInterval); + TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); + TDataPartitionTableResp dataPartitionTableResp = + client.getOrCreateDataPartitionTable(dataPartitionReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); + ConfigNodeTestUtils.checkDataPartitionTable( + database, + 1, + 2, + 1, + 2, + testTimePartitionInterval, + dataPartitionTableResp.getDataPartitionTable()); + } } @Test diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index fdec21c1c00..31a8cefd2d9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -224,6 +224,9 @@ public class ConfigRegionStateMachine () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); threadPool.submit( () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()); + + configManager.getConsensusManager().setLeaderServiceReady(true); + } else { LOGGER.info( "Current node [nodeId:{}, ip:port: {}] is not longer the leader, " @@ -232,6 +235,8 @@ public class ConfigRegionStateMachine currentNodeTEndPoint, newLeaderId); + configManager.getConsensusManager().setLeaderServiceReady(false); + // Stop leader scheduling services configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 6709d487139..30fc7a85dcf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -56,6 +56,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; @@ -73,6 +74,8 @@ public class ConsensusManager { private final IManager configManager; private IConsensus consensusImpl; + private final AtomicBoolean isLeaderServiceReady = new AtomicBoolean(false); + public ConsensusManager(IManager configManager, ConfigRegionStateMachine stateMachine) throws IOException { this.configManager = configManager; @@ -353,7 +356,7 @@ public class ConsensusManager { public TSStatus confirmLeader() { TSStatus result = new TSStatus(); - if (isLeader()) { + if (isLeader() && isLeaderServiceReady.get()) { return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } else { result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); @@ -369,6 +372,10 @@ public class ConsensusManager { } } + public void setLeaderServiceReady(boolean isLeaderServiceReady) { + this.isLeaderServiceReady.set(isLeaderServiceReady); + } + public ConsensusGroupId getConsensusGroupId() { return DEFAULT_CONSENSUS_GROUP_ID; }
