This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/fix_procedure_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit afe82dfe0688f8d05c1afc142700f6bef2fd6f4a
Author: Beyyes <[email protected]>
AuthorDate: Tue Sep 6 14:58:47 2022 +0800

    perfect the RegionMigrateService
---
 .../consensus/request/ConfigPhysicalPlan.java      |  5 ++++
 .../iotdb/confignode/manager/ConfigManager.java    |  4 +--
 .../iotdb/confignode/manager/ConsensusManager.java | 33 ++++++++++++----------
 .../apache/iotdb/confignode/manager/IManager.java  |  4 +--
 .../procedure/impl/RegionMigrateProcedure.java     |  4 +--
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  2 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  3 ++
 .../iotdb/db/service/RegionMigrateService.java     |  9 +++---
 8 files changed, 38 insertions(+), 26 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 701fcdf67f..f2e4edf976 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -104,6 +104,7 @@ public abstract class ConfigPhysicalPlan implements 
IConsensusRequest {
       if (typeNum >= ConfigPhysicalPlanType.values().length) {
         throw new IOException("unrecognized log type " + typeNum);
       }
+
       ConfigPhysicalPlanType type = ConfigPhysicalPlanType.values()[typeNum];
       ConfigPhysicalPlan req;
       switch (type) {
@@ -241,6 +242,10 @@ public abstract class ConfigPhysicalPlan implements 
IConsensusRequest {
           throw new IOException("unknown PhysicalPlan type: " + typeNum);
       }
       req.deserializeImpl(buffer);
+      LOGGER.info(
+          "invoking create method in ConfigPhysicalPlan, planType: {}, req: 
{}",
+          ConfigPhysicalPlanType.values()[typeNum],
+          req instanceof UpdateProcedurePlan ? ((UpdateProcedurePlan) 
req).getProcedure() : null);
       return req;
     }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index dcf3149530..56f051f328 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -691,8 +691,8 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus addConsensusGroup(List<TConfigNodeLocation> 
configNodeLocations) {
-    consensusManager.addConsensusGroup(configNodeLocations);
+  public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> 
configNodeLocations) {
+    consensusManager.createPeerForConsensusGroup(configNodeLocations);
     return StatusUtils.OK;
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 3ba78e601d..1322a498c8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -47,17 +47,19 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /** ConsensusManager maintains consensus class, request will redirect to 
consensus layer */
 public class ConsensusManager {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsensusManager.class);
-  private static final ConfigNodeConfig conf = 
ConfigNodeDescriptor.getInstance().getConf();
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
 
   private final IManager configManager;
 
   private ConsensusGroupId consensusGroupId;
   private IConsensus consensusImpl;
+  private final int seedConfigNodeId = 0;
 
   public ConsensusManager(IManager configManager, PartitionRegionStateMachine 
stateMachine)
       throws IOException {
@@ -72,15 +74,15 @@ public class ConsensusManager {
   /** ConsensusLayer local implementation */
   private void setConsensusLayer(PartitionRegionStateMachine stateMachine) 
throws IOException {
     // There is only one ConfigNodeGroup
-    consensusGroupId = new PartitionRegionId(conf.getPartitionRegionId());
+    consensusGroupId = new PartitionRegionId(CONF.getPartitionRegionId());
 
     // Implement local ConsensusLayer by ConfigNodeConfig
     consensusImpl =
         ConsensusFactory.getConsensusImpl(
-                conf.getConfigNodeConsensusProtocolClass(),
+                CONF.getConfigNodeConsensusProtocolClass(),
                 ConsensusConfig.newBuilder()
-                    .setThisNode(new TEndPoint(conf.getInternalAddress(), 
conf.getConsensusPort()))
-                    .setStorageDir(conf.getConsensusDir())
+                    .setThisNode(new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort()))
+                    .setStorageDir(CONF.getConsensusDir())
                     .build(),
                 gid -> stateMachine)
             .orElseThrow(
@@ -88,41 +90,42 @@ public class ConsensusManager {
                     new IllegalArgumentException(
                         String.format(
                             ConsensusFactory.CONSTRUCT_FAILED_MSG,
-                            conf.getConfigNodeConsensusProtocolClass())));
+                            CONF.getConfigNodeConsensusProtocolClass())));
     consensusImpl.start();
 
     if (SystemPropertiesUtils.isRestarted()) {
       try {
         // Create ConsensusGroup from confignode-system.properties file when 
restart
         // TODO: Check and notify if current ConfigNode's ip or port has 
changed
-        addConsensusGroup(SystemPropertiesUtils.loadConfigNodeList());
+        
createPeerForConsensusGroup(SystemPropertiesUtils.loadConfigNodeList());
       } catch (BadNodeUrlException e) {
         throw new IOException(e);
       }
     } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
       // Create ConsensusGroup that contains only itself
       // if the current ConfigNode is Seed-ConfigNode
-      addConsensusGroup(
+      createPeerForConsensusGroup(
           Collections.singletonList(
               new TConfigNodeLocation(
-                  0,
-                  new TEndPoint(conf.getInternalAddress(), 
conf.getInternalPort()),
-                  new TEndPoint(conf.getInternalAddress(), 
conf.getConsensusPort()))));
+                      seedConfigNodeId,
+                  new TEndPoint(CONF.getInternalAddress(), 
CONF.getInternalPort()),
+                  new TEndPoint(CONF.getInternalAddress(), 
CONF.getConsensusPort()))));
     }
   }
 
   /**
-   * Add the current ConfigNode to the ConsensusGroup
+   * Create peer in new node to build consensus group
    *
    * @param configNodeLocations All registered ConfigNodes
    */
-  public void addConsensusGroup(List<TConfigNodeLocation> configNodeLocations) 
{
+  public void createPeerForConsensusGroup(List<TConfigNodeLocation> 
configNodeLocations) {
     if (configNodeLocations.size() == 0) {
-      LOGGER.warn("configNodeLocations is null, addConsensusGroup failed.");
+      LOGGER.warn("configNodeLocations is empty, createPeerForConsensusGroup 
failed.");
       return;
     }
 
-    LOGGER.info("Set ConfigNode consensus group {}...", configNodeLocations);
+    LOGGER.info("createPeerForConsensusGroup {}...", configNodeLocations);
+
     List<Peer> peerList = new ArrayList<>();
     for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
       peerList.add(new Peer(consensusGroupId, 
configNodeLocation.getConsensusEndPoint()));
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index db3a802be2..b989356542 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -254,11 +254,11 @@ public interface IManager {
   TSStatus registerConfigNode(TConfigNodeRegisterReq req);
 
   /**
-   * Add Consensus Group in new node.
+   * Create peer in new node to build consensus group.
    *
    * @return status
    */
-  TSStatus addConsensusGroup(List<TConfigNodeLocation> configNodeLocations);
+  TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> 
configNodeLocations);
 
   /**
    * Remove ConfigNode
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
index 937130a636..da53d002a8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
@@ -157,12 +157,12 @@ public class RegionMigrateProcedure
     configNodeProcedureEnv.getSchedulerLock().lock();
     try {
       if (configNodeProcedureEnv.getRegionMigrateLock().tryLock(this)) {
-        LOG.info("{} acquire lock.", getProcId());
+        LOG.info("procedureId {} acquire lock.", getProcId());
         return ProcedureLockState.LOCK_ACQUIRED;
       }
       configNodeProcedureEnv.getRegionMigrateLock().waitProcedure(this);
 
-      LOG.info("{} wait for lock.", getProcId());
+      LOG.info("procedureId {} wait for lock.", getProcId());
       return ProcedureLockState.LOCK_EVENT_WAIT;
     } finally {
       configNodeProcedureEnv.getSchedulerLock().unlock();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 0064e6b372..e8ee727a35 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -393,7 +393,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) {
-    return configManager.addConsensusGroup(registerResp.getConfigNodeList());
+    return 
configManager.createPeerForConsensusGroup(registerResp.getConfigNodeList());
   }
 
   @Override
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 32429cec1f..08a2302dfd 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -148,6 +148,7 @@ public class ConfigPhysicalPlanSerDeTest {
   public void SetTTLPlanTest() throws IOException {
     SetTTLPlan req0 = new SetTTLPlan(Arrays.asList("root", "sg0"), 
Long.MAX_VALUE);
     SetTTLPlan req1 = (SetTTLPlan) 
ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer());
+    System.out.println(req1);
     Assert.assertEquals(req0, req1);
   }
 
@@ -619,6 +620,7 @@ public class ConfigPhysicalPlanSerDeTest {
                 0, new TEndPoint("0.0.0.0", 22277), new TEndPoint("0.0.0.0", 
22278)));
     ApplyConfigNodePlan req1 =
         (ApplyConfigNodePlan) 
ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer());
+    System.out.println(req1.toString());
     Assert.assertEquals(req0, req1);
   }
 
@@ -644,6 +646,7 @@ public class ConfigPhysicalPlanSerDeTest {
     UpdateProcedurePlan reqNew =
         (UpdateProcedurePlan)
             
ConfigPhysicalPlan.Factory.create(updateProcedurePlan.serializeToByteBuffer());
+    System.out.println(reqNew.getProcedure());
     Procedure proc = reqNew.getProcedure();
     Assert.assertEquals(proc, procedure);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java 
b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index d4170f4dfe..996da840bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -55,6 +55,7 @@ public class RegionMigrateService implements IService {
   private static final int RETRY = 5;
 
   private static final int SLEEP_MILLIS = 5000;
+
   private RegionMigratePool regionMigratePool;
 
   private RegionMigrateService() {}
@@ -64,10 +65,10 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * add a region peer
+   * submit AddRegionPeerTask
    *
    * @param req TMigrateRegionReq
-   * @return submit task succeed?
+   * @return if the submit task succeed
    */
   public synchronized boolean submitAddRegionPeerTask(TMigrateRegionReq req) {
 
@@ -86,10 +87,10 @@ public class RegionMigrateService implements IService {
   }
 
   /**
-   * remove a region peer
+   * submit RemoveRegionPeerTask
    *
    * @param req TMigrateRegionReq
-   * @return submit task succeed?
+   * @return if the submit task succeed
    */
   public synchronized boolean submitRemoveRegionPeerTask(TMigrateRegionReq 
req) {
 

Reply via email to