This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 18df9a986be Region status Adding and Removing (#12342)
18df9a986be is described below

commit 18df9a986be234960b12f44dd571620ba30a3978
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Apr 18 10:01:36 2024 +0800

    Region status Adding and Removing (#12342)
    
    * basically done
    
    * self review
    
    * gg
---
 .../handlers/heartbeat/DataNodeHeartbeatHandler.java      |  3 ++-
 .../apache/iotdb/confignode/manager/load/LoadManager.java |  8 +++++---
 .../iotdb/confignode/manager/load/cache/LoadCache.java    |  7 +++++--
 .../confignode/manager/load/cache/region/RegionCache.java | 15 +++++++++++++++
 .../manager/load/cache/region/RegionGroupCache.java       | 12 ++++++++++--
 .../confignode/procedure/env/RegionMaintainHandler.java   | 15 +++++++++++++--
 .../procedure/impl/region/AddRegionPeerProcedure.java     |  7 ++++++-
 .../procedure/impl/region/RemoveRegionPeerProcedure.java  |  3 +++
 .../org/apache/iotdb/commons/cluster/RegionStatus.java    | 12 +++---------
 9 files changed, 62 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 09a0e9c6383..49981d72834 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -100,7 +100,8 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
                       new RegionHeartbeatSample(
                           heartbeatResp.getHeartbeatTimestamp(),
                           // Region will inherit DataNode's status
-                          RegionStatus.parse(heartbeatResp.getStatus())));
+                          RegionStatus.valueOf(heartbeatResp.getStatus())),
+                      false);
 
               if 
(((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
                           && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 7aad164be55..895d89246b8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -333,7 +333,7 @@ public class LoadManager {
             regionHeartbeatSampleMap.forEach(
                 (dataNodeId, regionHeartbeatSample) ->
                     loadCache.cacheRegionHeartbeatSample(
-                        regionGroupId, dataNodeId, regionHeartbeatSample)));
+                        regionGroupId, dataNodeId, regionHeartbeatSample, 
false)));
     loadCache.updateRegionGroupStatistics();
     
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
   }
@@ -344,11 +344,13 @@ public class LoadManager {
    * @param regionGroupId the specified RegionGroup
    * @param dataNodeId the specified DataNode
    */
-  public void forceAddRegionCache(TConsensusGroupId regionGroupId, int 
dataNodeId) {
+  public void forceAddRegionCache(
+      TConsensusGroupId regionGroupId, int dataNodeId, RegionStatus 
regionStatus) {
     loadCache.cacheRegionHeartbeatSample(
         regionGroupId,
         dataNodeId,
-        new RegionHeartbeatSample(System.nanoTime(), RegionStatus.Running));
+        new RegionHeartbeatSample(System.nanoTime(), regionStatus),
+        true);
     loadCache.updateRegionGroupStatistics();
     
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 4a8a1a5f382..6fae8010e22 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -201,10 +201,13 @@ public class LoadCache {
    * @param sample the latest heartbeat sample
    */
   public void cacheRegionHeartbeatSample(
-      TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample 
sample) {
+      TConsensusGroupId regionGroupId,
+      int nodeId,
+      RegionHeartbeatSample sample,
+      boolean overwrite) {
     regionGroupCacheMap
         .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache())
-        .cacheHeartbeatSample(nodeId, sample);
+        .cacheHeartbeatSample(nodeId, sample, overwrite);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index ec496b478c7..2e68d370519 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -57,4 +57,19 @@ public class RegionCache extends AbstractLoadCache {
   public RegionStatistics getCurrentStatistics() {
     return (RegionStatistics) currentStatistics.get();
   }
+
+  public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample, 
boolean overwrite) {
+    if (overwrite || getLastSample() == null) {
+      super.cacheHeartbeatSample(newHeartbeatSample);
+      return;
+    }
+    RegionStatus lastStatus = ((RegionHeartbeatSample) 
getLastSample()).getStatus();
+    if (lastStatus.equals(RegionStatus.Adding) || 
lastStatus.equals(RegionStatus.Removing)) {
+      RegionHeartbeatSample fakeHeartbeatSample =
+          new 
RegionHeartbeatSample(newHeartbeatSample.getSampleLogicalTimestamp(), 
lastStatus);
+      super.cacheHeartbeatSample(fakeHeartbeatSample);
+    } else {
+      super.cacheHeartbeatSample(newHeartbeatSample);
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index 532e955f62e..ae922dc0028 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 
 import java.util.Map;
@@ -51,11 +52,18 @@ public class RegionGroupCache {
    *
    * @param dataNodeId Where the specified Region resides
    * @param newHeartbeatSample The newest RegionHeartbeatSample
+   * @param overwrite Able to overwrite Adding or Removing
    */
-  public void cacheHeartbeatSample(int dataNodeId, RegionHeartbeatSample 
newHeartbeatSample) {
+  public void cacheHeartbeatSample(
+      int dataNodeId, RegionHeartbeatSample newHeartbeatSample, boolean 
overwrite) {
     regionCacheMap
         .computeIfAbsent(dataNodeId, empty -> new RegionCache())
-        .cacheHeartbeatSample(newHeartbeatSample);
+        .cacheHeartbeatSample(newHeartbeatSample, overwrite);
+  }
+
+  @TestOnly
+  public void cacheHeartbeatSample(int dataNodeId, RegionHeartbeatSample 
newHeartbeatSample) {
+    cacheHeartbeatSample(dataNodeId, newHeartbeatSample, false);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index aa2760cad34..cfad4aefa00 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
@@ -417,7 +418,8 @@ public class RegionMaintainHandler {
     return report;
   }
 
-  public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation 
newLocation) {
+  public void addRegionLocation(
+      TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus 
regionStatus) {
     AddRegionLocationPlan req = new AddRegionLocationPlan(regionId, 
newLocation);
     TSStatus status = 
configManager.getPartitionManager().addRegionLocation(req);
     LOGGER.info(
@@ -425,7 +427,16 @@ public class RegionMaintainHandler {
         regionId,
         getIdWithRpcEndpoint(newLocation),
         status);
-    configManager.getLoadManager().forceAddRegionCache(regionId, 
newLocation.getDataNodeId());
+    configManager
+        .getLoadManager()
+        .forceAddRegionCache(regionId, newLocation.getDataNodeId(), 
regionStatus);
+  }
+
+  public void updateRegionCache(
+      TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus 
regionStatus) {
+    configManager
+        .getLoadManager()
+        .forceAddRegionCache(regionId, newLocation.getDataNodeId(), 
regionStatus);
   }
 
   public void removeRegionLocation(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index 9cefb239489..b380c7ca721 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl.region;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -84,6 +85,7 @@ public class AddRegionPeerProcedure
       outerSwitch:
       switch (state) {
         case CREATE_NEW_REGION_PEER:
+          handler.addRegionLocation(consensusGroupId, destDataNode, 
RegionStatus.Adding);
           TSStatus status = handler.createNewRegionPeer(consensusGroupId, 
destDataNode);
           setKillPoint(state);
           if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
@@ -92,6 +94,7 @@ public class AddRegionPeerProcedure
           setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
           break;
         case DO_ADD_REGION_PEER:
+          handler.updateRegionCache(consensusGroupId, destDataNode, 
RegionStatus.Adding);
           // We don't want to re-submit AddRegionPeerTask when leader change 
or ConfigNode reboot
           if (!this.isStateDeserialized()) {
             TSStatus tsStatus =
@@ -127,7 +130,7 @@ public class AddRegionPeerProcedure
               throw new UnsupportedOperationException(msg);
           }
         case UPDATE_REGION_LOCATION_CACHE:
-          handler.addRegionLocation(consensusGroupId, destDataNode);
+          handler.updateRegionCache(consensusGroupId, destDataNode, 
RegionStatus.Running);
           setKillPoint(state);
           LOGGER.info("AddRegionPeer state {} complete", state);
           LOGGER.info(
@@ -147,6 +150,8 @@ public class AddRegionPeerProcedure
   }
 
   private void rollback(ConfigNodeProcedureEnv env, RegionMaintainHandler 
handler) {
+    handler.removeRegionLocation(consensusGroupId, destDataNode);
+
     List<TDataNodeLocation> correctDataNodeLocations =
         
env.getConfigManager().getPartitionManager().getAllReplicaSets().stream()
             .filter(tRegionReplicaSet -> 
tRegionReplicaSet.getRegionId().equals(consensusGroupId))
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index 1e183a631e6..ef3a58994bd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -79,6 +80,7 @@ public class RemoveRegionPeerProcedure
     try {
       switch (state) {
         case REMOVE_REGION_PEER:
+          handler.updateRegionCache(consensusGroupId, targetDataNode, 
RegionStatus.Removing);
           tsStatus =
               handler.submitRemoveRegionPeerTask(
                   this.getProcId(), targetDataNode, consensusGroupId, 
coordinator);
@@ -100,6 +102,7 @@ public class RemoveRegionPeerProcedure
           setNextState(DELETE_OLD_REGION_PEER);
           break;
         case DELETE_OLD_REGION_PEER:
+          handler.updateRegionCache(consensusGroupId, targetDataNode, 
RegionStatus.Removing);
           tsStatus =
               handler.submitDeleteOldRegionPeerTask(
                   this.getProcId(), targetDataNode, consensusGroupId);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
index ade6136fb24..b61a2d6a5f7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
@@ -27,6 +27,9 @@ public enum RegionStatus {
   /** Region connection failure */
   Unknown("Unknown"),
 
+  /** Region is the destination during an AddRegionPeerProcedure */
+  Adding("Adding"),
+
   /** Region is in removing */
   Removing("Removing"),
 
@@ -43,15 +46,6 @@ public enum RegionStatus {
     return status;
   }
 
-  public static RegionStatus parse(String status) {
-    for (RegionStatus regionStatus : RegionStatus.values()) {
-      if (regionStatus.status.equals(status)) {
-        return regionStatus;
-      }
-    }
-    throw new RuntimeException(String.format("RegionStatus %s doesn't exist.", 
status));
-  }
-
   public static boolean isNormalStatus(RegionStatus status) {
     // Currently, the only normal status is Running
     return status != null && status.equals(RegionStatus.Running);

Reply via email to