This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch Fix-ConfigNode-restart-bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fdbfc5402a864cdcfcf0765691da5a6bbb2e5f1f
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Aug 17 14:55:59 2023 +0800

    Finish
---
 .../it/cluster/IoTDBClusterRestartIT.java          | 105 ++++++++++++++++++++-
 .../statemachine/ConfigRegionStateMachine.java     |   5 +
 .../manager/consensus/ConsensusManager.java        |   9 +-
 3 files changed, 116 insertions(+), 3 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index 80c0da1c387..1bcfe5ac0a1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -19,11 +19,21 @@
 
 package org.apache.iotdb.confignode.it.cluster;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.EnvUtils;
@@ -34,17 +44,21 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(IoTDBTestRunner.class)
@@ -57,6 +71,9 @@ public class IoTDBClusterRestartIT {
   private static final int testDataNodeNum = 2;
   private static final int testReplicationFactor = 2;
 
+  private static final String database = "root.db";
+  private static final long testTimePartitionInterval = 604800000;
+
   @Before
   public void setUp() throws Exception {
     EnvFactory.getEnv()
@@ -67,7 +84,8 @@ public class IoTDBClusterRestartIT {
         .setDataRegionConsensusProtocolClass(ratisConsensusProtocolClass)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setSchemaReplicationFactor(testReplicationFactor)
-        .setDataReplicationFactor(testReplicationFactor);
+        .setDataReplicationFactor(testReplicationFactor)
+        .setTimePartitionInterval(testTimePartitionInterval);
 
     // Init 2C2D cluster environment
     EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, 
testDataNodeNum);
@@ -79,7 +97,52 @@ public class IoTDBClusterRestartIT {
   }
 
   @Test
-  public void clusterRestartTest() throws InterruptedException {
+  public void clusterRestartTest()
+      throws InterruptedException, ClientManagerException, IOException, 
TException,
+          IllegalPathException {
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      // Insert some data into cluster
+
+      TSStatus status = client.setDatabase(new TDatabaseSchema(database));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      final String d0 = database + ".d0.s";
+      TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq();
+      TSchemaPartitionTableResp schemaPartitionTableResp;
+      Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> 
schemaPartitionTable;
+      ByteBuffer buffer = ConfigNodeTestUtils.generatePatternTreeBuffer(new 
String[] {d0});
+      schemaPartitionReq.setPathPatternTree(buffer);
+      schemaPartitionTableResp = 
client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          schemaPartitionTableResp.getStatus().getCode());
+      Assert.assertEquals(1, 
schemaPartitionTableResp.getSchemaPartitionTableSize());
+      schemaPartitionTable = 
schemaPartitionTableResp.getSchemaPartitionTable();
+      Assert.assertEquals(1, schemaPartitionTable.get(database).size());
+
+      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+          ConfigNodeTestUtils.constructPartitionSlotsMap(
+              database, 0, 1, 0, 1, testTimePartitionInterval);
+      TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+      TDataPartitionTableResp dataPartitionTableResp =
+          client.getOrCreateDataPartitionTable(dataPartitionReq);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          dataPartitionTableResp.getStatus().getCode());
+      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+      ConfigNodeTestUtils.checkDataPartitionTable(
+          database,
+          0,
+          1,
+          0,
+          1,
+          testTimePartitionInterval,
+          dataPartitionTableResp.getDataPartitionTable());
+    }
+
     // Shutdown all cluster nodes
     for (int i = 0; i < testConfigNodeNum; i++) {
       EnvFactory.getEnv().shutdownConfigNode(i);
@@ -100,6 +163,44 @@ public class IoTDBClusterRestartIT {
     }
 
     ((AbstractEnv) EnvFactory.getEnv()).testWorking();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      // The cluster is still writable after restart
+      final String d1 = database + ".d1.s";
+      TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq();
+      TSchemaPartitionTableResp schemaPartitionTableResp;
+      Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> 
schemaPartitionTable;
+      ByteBuffer buffer = ConfigNodeTestUtils.generatePatternTreeBuffer(new 
String[] {d1});
+      schemaPartitionReq.setPathPatternTree(buffer);
+      schemaPartitionTableResp = 
client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          schemaPartitionTableResp.getStatus().getCode());
+      Assert.assertEquals(1, 
schemaPartitionTableResp.getSchemaPartitionTableSize());
+      schemaPartitionTable = 
schemaPartitionTableResp.getSchemaPartitionTable();
+      Assert.assertEquals(1, schemaPartitionTable.get(database).size());
+
+      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
+          ConfigNodeTestUtils.constructPartitionSlotsMap(
+              database, 1, 2, 1, 2, testTimePartitionInterval);
+      TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+      TDataPartitionTableResp dataPartitionTableResp =
+          client.getOrCreateDataPartitionTable(dataPartitionReq);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          dataPartitionTableResp.getStatus().getCode());
+      Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+      ConfigNodeTestUtils.checkDataPartitionTable(
+          database,
+          1,
+          2,
+          1,
+          2,
+          testTimePartitionInterval,
+          dataPartitionTableResp.getDataPartitionTable());
+    }
   }
 
   @Test
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index fdec21c1c00..31a8cefd2d9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -224,6 +224,9 @@ public class ConfigRegionStateMachine
           () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
       threadPool.submit(
           () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
+
+      configManager.getConsensusManager().setLeaderServiceReady(true);
+
     } else {
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
@@ -232,6 +235,8 @@ public class ConfigRegionStateMachine
           currentNodeTEndPoint,
           newLeaderId);
 
+      configManager.getConsensusManager().setLeaderServiceReady(false);
+
       // Stop leader scheduling services
       
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
       
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 6709d487139..30fc7a85dcf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -56,6 +56,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
 
@@ -73,6 +74,8 @@ public class ConsensusManager {
   private final IManager configManager;
   private IConsensus consensusImpl;
 
+  private final AtomicBoolean isLeaderServiceReady = new AtomicBoolean(false);
+
   public ConsensusManager(IManager configManager, ConfigRegionStateMachine 
stateMachine)
       throws IOException {
     this.configManager = configManager;
@@ -353,7 +356,7 @@ public class ConsensusManager {
   public TSStatus confirmLeader() {
     TSStatus result = new TSStatus();
 
-    if (isLeader()) {
+    if (isLeader() && isLeaderServiceReady.get()) {
       return result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } else {
       result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
@@ -369,6 +372,10 @@ public class ConsensusManager {
     }
   }
 
+  public void setLeaderServiceReady(boolean isLeaderServiceReady) {
+    this.isLeaderServiceReady.set(isLeaderServiceReady);
+  }
+
   public ConsensusGroupId getConsensusGroupId() {
     return DEFAULT_CONSENSUS_GROUP_ID;
   }

Reply via email to