This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch 429f8f2716b5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 88dc2af157a57e1bcafaaa204700dfb2d39380a3
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 29 10:41:12 2023 +0800

    Finish
---
 .../load/cache/region/RegionGroupStatistics.java   |   6 ++
 .../manager/load/service/StatisticsService.java    |  87 +++++++++++-----
 .../apache/iotdb/it/env/cluster/AbstractEnv.java   |  14 +++
 .../iotdb/it/env/remote/RemoteServerEnv.java       |   6 ++
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   8 ++
 .../load/IoTDBClusterRegionLeaderBalancingIT.java  | 113 ++++++++++++++++++++-
 6 files changed, 210 insertions(+), 24 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
index 8dc7a4c1ec9..600605bbcd5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -66,6 +68,10 @@ public class RegionGroupStatistics {
     return regionStatisticsMap;
   }
 
+  public List<Integer> getRegionIds() {
+    return new ArrayList<>(regionStatisticsMap.keySet());
+  }
+
   public static RegionGroupStatistics generateDefaultRegionGroupStatistics() {
     return new RegionGroupStatistics(RegionGroupStatus.Disabled, new 
ConcurrentHashMap<>());
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
index badc08768d6..7d84e16d57a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
@@ -36,7 +36,6 @@ import 
org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
-import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
@@ -48,6 +47,7 @@ import com.google.common.eventbus.EventBus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
@@ -180,10 +180,13 @@ public class StatisticsService implements 
IClusterStatusSubscriber {
     LOGGER.info("[NodeStatistics] NodeStatisticsMap: ");
     for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> 
nodeCacheEntry :
         differentNodeStatisticsMap.entrySet()) {
-      LOGGER.info(
-          "[NodeStatistics]\t {}={}",
-          "nodeId{" + nodeCacheEntry.getKey() + "}",
-          nodeCacheEntry.getValue().getRight());
+      if 
(!nodeCacheEntry.getValue().getRight().equals(nodeCacheEntry.getValue().getLeft()))
 {
+        LOGGER.info(
+            "[NodeStatistics]\t {}: {}->{}",
+            "nodeId{" + nodeCacheEntry.getKey() + "}",
+            nodeCacheEntry.getValue().getLeft(),
+            nodeCacheEntry.getValue().getRight());
+      }
     }
   }
 
@@ -193,14 +196,40 @@ public class StatisticsService implements 
IClusterStatusSubscriber {
     LOGGER.info("[RegionGroupStatistics] RegionGroupStatisticsMap: ");
     for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics, 
RegionGroupStatistics>>
         regionGroupStatisticsEntry : 
differentRegionGroupStatisticsMap.entrySet()) {
-      LOGGER.info("[RegionGroupStatistics]\t RegionGroup: {}", 
regionGroupStatisticsEntry.getKey());
-      LOGGER.info("[RegionGroupStatistics]\t {}", 
regionGroupStatisticsEntry.getValue());
-      for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
-          
regionGroupStatisticsEntry.getValue().getRight().getRegionStatisticsMap().entrySet())
 {
+      if (!regionGroupStatisticsEntry
+          .getValue()
+          .getRight()
+          .equals(regionGroupStatisticsEntry.getValue().getLeft())) {
         LOGGER.info(
-            "[RegionGroupStatistics]\t dataNodeId{}={}",
-            regionStatisticsEntry.getKey(),
-            regionStatisticsEntry.getValue());
+            "[RegionGroupStatistics]\t RegionGroup {}: {} -> {}",
+            regionGroupStatisticsEntry.getKey(),
+            
regionGroupStatisticsEntry.getValue().getLeft().getRegionGroupStatus(),
+            
regionGroupStatisticsEntry.getValue().getRight().getRegionGroupStatus());
+
+        List<Integer> leftIds = 
regionGroupStatisticsEntry.getValue().getLeft().getRegionIds();
+        List<Integer> rightIds = 
regionGroupStatisticsEntry.getValue().getRight().getRegionIds();
+        for (int leftId : leftIds) {
+          if (!rightIds.contains(leftId)) {
+            LOGGER.info(
+                "[RegionGroupStatistics]\t Region in DataNode {}: {} -> null",
+                leftId,
+                
regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId));
+          } else {
+            LOGGER.info(
+                "[RegionGroupStatistics]\t Region in DataNode {}: {} -> {}",
+                leftId,
+                
regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId),
+                
regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(leftId));
+          }
+        }
+        for (int rightId : rightIds) {
+          if (!leftIds.contains(rightId)) {
+            LOGGER.info(
+                "[RegionGroupStatistics]\t Region in DataNode {}: null -> {}",
+                rightId,
+                
regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(rightId));
+          }
+        }
       }
     }
   }
@@ -215,11 +244,13 @@ public class StatisticsService implements 
IClusterStatusSubscriber {
     LOGGER.info("[RegionLeader] RegionLeaderMap: ");
     for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> 
regionLeaderEntry :
         leaderMap.entrySet()) {
-      LOGGER.info(
-          "[RegionLeader]\t {}: {}->{}",
-          regionLeaderEntry.getKey(),
-          regionLeaderEntry.getValue().getLeft(),
-          regionLeaderEntry.getValue().getRight());
+      if 
(!regionLeaderEntry.getValue().getRight().equals(regionLeaderEntry.getValue().getLeft()))
 {
+        LOGGER.info(
+            "[RegionLeader]\t {}: {}->{}",
+            regionLeaderEntry.getKey(),
+            regionLeaderEntry.getValue().getLeft(),
+            regionLeaderEntry.getValue().getRight());
+      }
     }
   }
 
@@ -228,12 +259,22 @@ public class StatisticsService implements 
IClusterStatusSubscriber {
     LOGGER.info("[RegionPriority] RegionPriorityMap: ");
     for (Map.Entry<TConsensusGroupId, Pair<TRegionReplicaSet, 
TRegionReplicaSet>>
         regionPriorityEntry : priorityMap.entrySet()) {
-      LOGGER.info(
-          "[RegionPriority]\t {}={}",
-          regionPriorityEntry.getKey(),
-          
regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
-              .map(TDataNodeLocation::getDataNodeId)
-              .collect(Collectors.toList()));
+      if (!regionPriorityEntry
+          .getValue()
+          .getRight()
+          .equals(regionPriorityEntry.getValue().getLeft())) {
+        LOGGER.info(
+            "[RegionPriority]\t {}: {}->{}",
+            regionPriorityEntry.getKey(),
+            regionPriorityEntry.getValue().getLeft() == null
+                ? "null"
+                : 
regionPriorityEntry.getValue().getLeft().getDataNodeLocations().stream()
+                    .map(TDataNodeLocation::getDataNodeId)
+                    .collect(Collectors.toList()),
+            
regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toList()));
+      }
     }
   }
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
index a6600633589..af610a784b4 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
@@ -314,6 +314,14 @@ public abstract class AbstractEnv implements BaseEnv {
         getWriteConnection(null, username, password), getReadConnections(null, 
username, password));
   }
 
+  @Override
+  public Connection getConnectionWithSpecifiedDataNode(
+      DataNodeWrapper dataNode, String username, String password) throws 
SQLException {
+    return new ClusterTestConnection(
+        getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, 
password),
+        getReadConnections(null, username, password));
+  }
+
   private Connection getConnection(String endpoint, int queryTimeout) throws 
SQLException {
     IoTDBConnection connection =
         (IoTDBConnection)
@@ -387,6 +395,12 @@ public abstract class AbstractEnv implements BaseEnv {
       dataNode = this.dataNodeWrapperList.get(0);
     }
 
+    return getWriteConnectionWithSpecifiedDataNode(dataNode, version, 
username, password);
+  }
+
+  protected NodeConnection getWriteConnectionWithSpecifiedDataNode(
+      DataNodeWrapper dataNode, Constant.Version version, String username, 
String password)
+      throws SQLException {
     String endpoint = dataNode.getIp() + ":" + dataNode.getPort();
     Connection writeConnection =
         DriverManager.getConnection(
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
index 78e88fc9d28..4bb67c34ad4 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java
@@ -105,6 +105,12 @@ public class RemoteServerEnv implements BaseEnv {
     return connection;
   }
 
+  @Override
+  public Connection getConnectionWithSpecifiedDataNode(
+      DataNodeWrapper dataNode, String username, String password) throws 
SQLException {
+    return getConnection(username, password);
+  }
+
   @Override
   public Connection getConnection(Constant.Version version, String username, 
String password)
       throws SQLException {
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 6947ea63c0c..3ac8eed6657 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -56,8 +56,16 @@ public interface BaseEnv {
     return getConnection("root", "root");
   }
 
+  default Connection getConnectionWithSpecifiedDataNode(DataNodeWrapper 
dataNode)
+      throws SQLException {
+    return getConnectionWithSpecifiedDataNode(dataNode, "root", "root");
+  }
+
   Connection getConnection(String username, String password) throws 
SQLException;
 
+  Connection getConnectionWithSpecifiedDataNode(
+      DataNodeWrapper dataNode, String username, String password) throws 
SQLException;
+
   default Connection getConnection(Constant.Version version) throws 
SQLException {
     return getConnection(version, "root", "root");
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
index 2d1f17cb6c7..d7b8ca0e6db 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
@@ -43,11 +43,14 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.sql.Connection;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(IoTDBTestRunner.class)
@@ -130,7 +133,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
   }
 
   @Test
-  public void testMCFLeaderDistribution() throws Exception {
+  public void testMCFLeaderDistributionWithUnknownStatus() throws Exception {
     final int retryNum = 50;
 
     TSStatus status;
@@ -234,4 +237,112 @@ public class IoTDBClusterRegionLeaderBalancingIT {
       Assert.assertTrue(isDistributionBalanced);
     }
   }
+
+  @Test
+  public void testMCFLeaderDistributionWithReadOnlyStatus() throws Exception {
+    final int retryNum = 50;
+
+    TSStatus status;
+    final int storageGroupNum = 3;
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      for (int i = 0; i < storageGroupNum; i++) {
+        // Set StorageGroups
+        status = client.setDatabase(new TDatabaseSchema(sg + i));
+        Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+        // Create a DataRegionGroup for each StorageGroup
+        Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new 
HashMap<>();
+        seriesSlotMap.put(
+            new TSeriesPartitionSlot(1),
+            new TTimeSlotList()
+                .setTimePartitionSlots(Collections.singletonList(new 
TTimePartitionSlot(100))));
+        Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new 
HashMap<>();
+        sgSlotsMap.put(sg + i, seriesSlotMap);
+        TDataPartitionTableResp dataPartitionTableResp =
+            client.getOrCreateDataPartitionTable(new 
TDataPartitionReq(sgSlotsMap));
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            dataPartitionTableResp.getStatus().getCode());
+      }
+
+      // Check leader distribution
+      Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>();
+      TShowRegionResp showRegionResp;
+      boolean isDistributionBalanced = false;
+      for (int retry = 0; retry < retryNum; retry++) {
+        leaderCounter.clear();
+        showRegionResp = client.showRegion(new TShowRegionReq());
+        showRegionResp
+            .getRegionInfoList()
+            .forEach(
+                regionInfo -> {
+                  if 
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+                    leaderCounter
+                        .computeIfAbsent(regionInfo.getDataNodeId(), empty -> 
new AtomicInteger(0))
+                        .getAndIncrement();
+                  }
+                });
+
+        // All DataNodes have Region-leader
+        isDistributionBalanced = leaderCounter.size() == testDataNodeNum;
+        // Each DataNode has exactly 1 Region-leader
+        for (AtomicInteger leaderCount : leaderCounter.values()) {
+          if (leaderCount.get() != 1) {
+            isDistributionBalanced = false;
+          }
+        }
+
+        if (isDistributionBalanced) {
+          break;
+        } else {
+          TimeUnit.SECONDS.sleep(1);
+        }
+      }
+      Assert.assertTrue(isDistributionBalanced);
+    }
+
+    try (Connection connection =
+            EnvFactory.getEnv()
+                
.getConnectionWithSpecifiedDataNode(EnvFactory.getEnv().getDataNodeWrapper(0));
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET SYSTEM TO READONLY ON LOCAL");
+    }
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      // Make sure there exist one ReadOnly DataNode
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              
Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(0)),
+              Collections.singletonList(NodeStatus.ReadOnly));
+
+      // Check leader distribution
+      TShowRegionResp showRegionResp;
+      AtomicBoolean isDistributionBalanced = new AtomicBoolean();
+      for (int retry = 0; retry < retryNum; retry++) {
+        isDistributionBalanced.set(true);
+        showRegionResp = client.showRegion(new TShowRegionReq());
+        showRegionResp
+            .getRegionInfoList()
+            .forEach(
+                regionInfo -> {
+                  if 
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())
+                      && 
NodeStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) {
+                    // ReadOnly DataNode couldn't have Region-leader
+                    isDistributionBalanced.set(false);
+                  }
+                });
+
+        if (isDistributionBalanced.get()) {
+          break;
+        } else {
+          TimeUnit.SECONDS.sleep(1);
+        }
+      }
+      Assert.assertTrue(isDistributionBalanced.get());
+    }
+  }
 }

Reply via email to