This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/fix_procedure_bug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit afe82dfe0688f8d05c1afc142700f6bef2fd6f4a Author: Beyyes <[email protected]> AuthorDate: Tue Sep 6 14:58:47 2022 +0800 perfect the RegionMigrateService --- .../consensus/request/ConfigPhysicalPlan.java | 5 ++++ .../iotdb/confignode/manager/ConfigManager.java | 4 +-- .../iotdb/confignode/manager/ConsensusManager.java | 33 ++++++++++++---------- .../apache/iotdb/confignode/manager/IManager.java | 4 +-- .../procedure/impl/RegionMigrateProcedure.java | 4 +-- .../thrift/ConfigNodeRPCServiceProcessor.java | 2 +- .../request/ConfigPhysicalPlanSerDeTest.java | 3 ++ .../iotdb/db/service/RegionMigrateService.java | 9 +++--- 8 files changed, 38 insertions(+), 26 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index 701fcdf67f..f2e4edf976 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -104,6 +104,7 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest { if (typeNum >= ConfigPhysicalPlanType.values().length) { throw new IOException("unrecognized log type " + typeNum); } + ConfigPhysicalPlanType type = ConfigPhysicalPlanType.values()[typeNum]; ConfigPhysicalPlan req; switch (type) { @@ -241,6 +242,10 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest { throw new IOException("unknown PhysicalPlan type: " + typeNum); } req.deserializeImpl(buffer); + LOGGER.info( + "invoking create method in ConfigPhysicalPlan, planType: {}, req: {}", + ConfigPhysicalPlanType.values()[typeNum], + req instanceof UpdateProcedurePlan ? ((UpdateProcedurePlan) req).getProcedure() : null); return req; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index dcf3149530..56f051f328 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -691,8 +691,8 @@ public class ConfigManager implements IManager { } @Override - public TSStatus addConsensusGroup(List<TConfigNodeLocation> configNodeLocations) { - consensusManager.addConsensusGroup(configNodeLocations); + public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) { + consensusManager.createPeerForConsensusGroup(configNodeLocations); return StatusUtils.OK; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java index 3ba78e601d..1322a498c8 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java @@ -47,17 +47,19 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** ConsensusManager maintains consensus class, request will redirect to consensus layer */ public class ConsensusManager { private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class); - private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf(); + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private final IManager configManager; private ConsensusGroupId consensusGroupId; private IConsensus consensusImpl; + private final int seedConfigNodeId = 0; public ConsensusManager(IManager configManager, PartitionRegionStateMachine stateMachine) throws IOException { @@ -72,15 +74,15 @@ public class ConsensusManager { /** ConsensusLayer local implementation */ private void setConsensusLayer(PartitionRegionStateMachine stateMachine) throws IOException { // There is only one ConfigNodeGroup - consensusGroupId = new PartitionRegionId(conf.getPartitionRegionId()); + consensusGroupId = new PartitionRegionId(CONF.getPartitionRegionId()); // Implement local ConsensusLayer by ConfigNodeConfig consensusImpl = ConsensusFactory.getConsensusImpl( - conf.getConfigNodeConsensusProtocolClass(), + CONF.getConfigNodeConsensusProtocolClass(), ConsensusConfig.newBuilder() - .setThisNode(new TEndPoint(conf.getInternalAddress(), conf.getConsensusPort())) - .setStorageDir(conf.getConsensusDir()) + .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())) + .setStorageDir(CONF.getConsensusDir()) .build(), gid -> stateMachine) .orElseThrow( @@ -88,41 +90,42 @@ public class ConsensusManager { new IllegalArgumentException( String.format( ConsensusFactory.CONSTRUCT_FAILED_MSG, - conf.getConfigNodeConsensusProtocolClass()))); + CONF.getConfigNodeConsensusProtocolClass()))); consensusImpl.start(); if (SystemPropertiesUtils.isRestarted()) { try { // Create ConsensusGroup from confignode-system.properties file when restart // TODO: Check and notify if current ConfigNode's ip or port has changed - addConsensusGroup(SystemPropertiesUtils.loadConfigNodeList()); + createPeerForConsensusGroup(SystemPropertiesUtils.loadConfigNodeList()); } catch (BadNodeUrlException e) { throw new IOException(e); } } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { // Create ConsensusGroup that contains only itself // if the current ConfigNode is Seed-ConfigNode - addConsensusGroup( + createPeerForConsensusGroup( Collections.singletonList( new TConfigNodeLocation( - 0, - new TEndPoint(conf.getInternalAddress(), conf.getInternalPort()), - new TEndPoint(conf.getInternalAddress(), conf.getConsensusPort())))); + seedConfigNodeId, + new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), + new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); } } /** - * Add the current ConfigNode to the ConsensusGroup + * Create peer in new node to build consensus group * * @param configNodeLocations All registered ConfigNodes */ - public void addConsensusGroup(List<TConfigNodeLocation> configNodeLocations) { + public void createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) { if (configNodeLocations.size() == 0) { - LOGGER.warn("configNodeLocations is null, addConsensusGroup failed."); + LOGGER.warn("configNodeLocations is empty, createPeerForConsensusGroup failed."); return; } - LOGGER.info("Set ConfigNode consensus group {}...", configNodeLocations); + LOGGER.info("createPeerForConsensusGroup {}...", configNodeLocations); + List<Peer> peerList = new ArrayList<>(); for (TConfigNodeLocation configNodeLocation : configNodeLocations) { peerList.add(new Peer(consensusGroupId, configNodeLocation.getConsensusEndPoint())); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index db3a802be2..b989356542 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -254,11 +254,11 @@ public interface IManager { TSStatus registerConfigNode(TConfigNodeRegisterReq req); /** - * Add Consensus Group in new node. + * Create peer in new node to build consensus group. * * @return status */ - TSStatus addConsensusGroup(List<TConfigNodeLocation> configNodeLocations); + TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations); /** * Remove ConfigNode diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java index 937130a636..da53d002a8 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java @@ -157,12 +157,12 @@ public class RegionMigrateProcedure configNodeProcedureEnv.getSchedulerLock().lock(); try { if (configNodeProcedureEnv.getRegionMigrateLock().tryLock(this)) { - LOG.info("{} acquire lock.", getProcId()); + LOG.info("procedureId {} acquire lock.", getProcId()); return ProcedureLockState.LOCK_ACQUIRED; } configNodeProcedureEnv.getRegionMigrateLock().waitProcedure(this); - LOG.info("{} wait for lock.", getProcId()); + LOG.info("procedureId {} wait for lock.", getProcId()); return ProcedureLockState.LOCK_EVENT_WAIT; } finally { configNodeProcedureEnv.getSchedulerLock().unlock(); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 0064e6b372..e8ee727a35 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -393,7 +393,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) { - return configManager.addConsensusGroup(registerResp.getConfigNodeList()); + return configManager.createPeerForConsensusGroup(registerResp.getConfigNodeList()); } @Override diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index 32429cec1f..08a2302dfd 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -148,6 +148,7 @@ public class ConfigPhysicalPlanSerDeTest { public void SetTTLPlanTest() throws IOException { SetTTLPlan req0 = new SetTTLPlan(Arrays.asList("root", "sg0"), Long.MAX_VALUE); SetTTLPlan req1 = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer()); + System.out.println(req1); Assert.assertEquals(req0, req1); } @@ -619,6 +620,7 @@ public class ConfigPhysicalPlanSerDeTest { 0, new TEndPoint("0.0.0.0", 22277), new TEndPoint("0.0.0.0", 22278))); ApplyConfigNodePlan req1 = (ApplyConfigNodePlan) ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer()); + System.out.println(req1.toString()); Assert.assertEquals(req0, req1); } @@ -644,6 +646,7 @@ public class ConfigPhysicalPlanSerDeTest { UpdateProcedurePlan reqNew = (UpdateProcedurePlan) ConfigPhysicalPlan.Factory.create(updateProcedurePlan.serializeToByteBuffer()); + System.out.println(reqNew.getProcedure()); Procedure proc = reqNew.getProcedure(); Assert.assertEquals(proc, procedure); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index d4170f4dfe..996da840bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -55,6 +55,7 @@ public class RegionMigrateService implements IService { private static final int RETRY = 5; private static final int SLEEP_MILLIS = 5000; + private RegionMigratePool regionMigratePool; private RegionMigrateService() {} @@ -64,10 +65,10 @@ public class RegionMigrateService implements IService { } /** - * add a region peer + * submit AddRegionPeerTask * * @param req TMigrateRegionReq - * @return submit task succeed? + * @return if the submit task succeed */ public synchronized boolean submitAddRegionPeerTask(TMigrateRegionReq req) { @@ -86,10 +87,10 @@ public class RegionMigrateService implements IService { } /** - * remove a region peer + * submit RemoveRegionPeerTask * * @param req TMigrateRegionReq - * @return submit task succeed? + * @return if the submit task succeed */ public synchronized boolean submitRemoveRegionPeerTask(TMigrateRegionReq req) {
