This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch 429f8f2716b5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 88dc2af157a57e1bcafaaa204700dfb2d39380a3 Author: YongzaoDan <[email protected]> AuthorDate: Thu Jun 29 10:41:12 2023 +0800 Finish --- .../load/cache/region/RegionGroupStatistics.java | 6 ++ .../manager/load/service/StatisticsService.java | 87 +++++++++++----- .../apache/iotdb/it/env/cluster/AbstractEnv.java | 14 +++ .../iotdb/it/env/remote/RemoteServerEnv.java | 6 ++ .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 8 ++ .../load/IoTDBClusterRegionLeaderBalancingIT.java | 113 ++++++++++++++++++++- 6 files changed, 210 insertions(+), 24 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java index 8dc7a4c1ec9..600605bbcd5 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -66,6 +68,10 @@ public class RegionGroupStatistics { return regionStatisticsMap; } + public List<Integer> getRegionIds() { + return new ArrayList<>(regionStatisticsMap.keySet()); + } + public static RegionGroupStatistics generateDefaultRegionGroupStatistics() { return new RegionGroupStatistics(RegionGroupStatus.Disabled, new ConcurrentHashMap<>()); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java index badc08768d6..7d84e16d57a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java @@ -36,7 +36,6 @@ import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; import org.apache.iotdb.confignode.manager.load.cache.LoadCache; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; -import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent; import org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent; @@ -48,6 +47,7 @@ import com.google.common.eventbus.EventBus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -180,10 +180,13 @@ public class StatisticsService implements IClusterStatusSubscriber { LOGGER.info("[NodeStatistics] NodeStatisticsMap: "); for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry : differentNodeStatisticsMap.entrySet()) { - LOGGER.info( - "[NodeStatistics]\t {}={}", - "nodeId{" + nodeCacheEntry.getKey() + "}", - nodeCacheEntry.getValue().getRight()); + if (!nodeCacheEntry.getValue().getRight().equals(nodeCacheEntry.getValue().getLeft())) { + LOGGER.info( + "[NodeStatistics]\t {}: {}->{}", + "nodeId{" + nodeCacheEntry.getKey() + "}", + nodeCacheEntry.getValue().getLeft(), + nodeCacheEntry.getValue().getRight()); + } } } @@ -193,14 +196,40 @@ public class StatisticsService implements IClusterStatusSubscriber { LOGGER.info("[RegionGroupStatistics] RegionGroupStatisticsMap: "); for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> regionGroupStatisticsEntry : differentRegionGroupStatisticsMap.entrySet()) { - LOGGER.info("[RegionGroupStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey()); - LOGGER.info("[RegionGroupStatistics]\t {}", regionGroupStatisticsEntry.getValue()); - for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry : - regionGroupStatisticsEntry.getValue().getRight().getRegionStatisticsMap().entrySet()) { + if (!regionGroupStatisticsEntry + .getValue() + .getRight() + .equals(regionGroupStatisticsEntry.getValue().getLeft())) { LOGGER.info( - "[RegionGroupStatistics]\t dataNodeId{}={}", - regionStatisticsEntry.getKey(), - regionStatisticsEntry.getValue()); + "[RegionGroupStatistics]\t RegionGroup {}: {} -> {}", + regionGroupStatisticsEntry.getKey(), + regionGroupStatisticsEntry.getValue().getLeft().getRegionGroupStatus(), + regionGroupStatisticsEntry.getValue().getRight().getRegionGroupStatus()); + + List<Integer> leftIds = regionGroupStatisticsEntry.getValue().getLeft().getRegionIds(); + List<Integer> rightIds = regionGroupStatisticsEntry.getValue().getRight().getRegionIds(); + for (int leftId : leftIds) { + if (!rightIds.contains(leftId)) { + LOGGER.info( + "[RegionGroupStatistics]\t Region in DataNode {}: {} -> null", + leftId, + regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId)); + } else { + LOGGER.info( + "[RegionGroupStatistics]\t Region in DataNode {}: {} -> {}", + leftId, + regionGroupStatisticsEntry.getValue().getLeft().getRegionStatus(leftId), + regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(leftId)); + } + } + for (int rightId : rightIds) { + if (!leftIds.contains(rightId)) { + LOGGER.info( + "[RegionGroupStatistics]\t Region in DataNode {}: null -> {}", + rightId, + regionGroupStatisticsEntry.getValue().getRight().getRegionStatus(rightId)); + } + } } } } @@ -215,11 +244,13 @@ public class StatisticsService implements IClusterStatusSubscriber { LOGGER.info("[RegionLeader] RegionLeaderMap: "); for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> regionLeaderEntry : leaderMap.entrySet()) { - LOGGER.info( - "[RegionLeader]\t {}: {}->{}", - regionLeaderEntry.getKey(), - regionLeaderEntry.getValue().getLeft(), - regionLeaderEntry.getValue().getRight()); + if (!regionLeaderEntry.getValue().getRight().equals(regionLeaderEntry.getValue().getLeft())) { + LOGGER.info( + "[RegionLeader]\t {}: {}->{}", + regionLeaderEntry.getKey(), + regionLeaderEntry.getValue().getLeft(), + regionLeaderEntry.getValue().getRight()); + } } } @@ -228,12 +259,22 @@ public class StatisticsService implements IClusterStatusSubscriber { LOGGER.info("[RegionPriority] RegionPriorityMap: "); for (Map.Entry<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> regionPriorityEntry : priorityMap.entrySet()) { - LOGGER.info( - "[RegionPriority]\t {}={}", - regionPriorityEntry.getKey(), - regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream() - .map(TDataNodeLocation::getDataNodeId) - .collect(Collectors.toList())); + if (!regionPriorityEntry + .getValue() + .getRight() + .equals(regionPriorityEntry.getValue().getLeft())) { + LOGGER.info( + "[RegionPriority]\t {}: {}->{}", + regionPriorityEntry.getKey(), + regionPriorityEntry.getValue().getLeft() == null + ? "null" + : regionPriorityEntry.getValue().getLeft().getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toList()), + regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toList())); + } } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java index a6600633589..af610a784b4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java @@ -314,6 +314,14 @@ public abstract class AbstractEnv implements BaseEnv { getWriteConnection(null, username, password), getReadConnections(null, username, password)); } + @Override + public Connection getConnectionWithSpecifiedDataNode( + DataNodeWrapper dataNode, String username, String password) throws SQLException { + return new ClusterTestConnection( + getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, password), + getReadConnections(null, username, password)); + } + private Connection getConnection(String endpoint, int queryTimeout) throws SQLException { IoTDBConnection connection = (IoTDBConnection) @@ -387,6 +395,12 @@ public abstract class AbstractEnv implements BaseEnv { dataNode = this.dataNodeWrapperList.get(0); } + return getWriteConnectionWithSpecifiedDataNode(dataNode, version, username, password); + } + + protected NodeConnection getWriteConnectionWithSpecifiedDataNode( + DataNodeWrapper dataNode, Constant.Version version, String username, String password) + throws SQLException { String endpoint = dataNode.getIp() + ":" + dataNode.getPort(); Connection writeConnection = DriverManager.getConnection( diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java index 78e88fc9d28..4bb67c34ad4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteServerEnv.java @@ -105,6 +105,12 @@ public class RemoteServerEnv implements BaseEnv { return connection; } + @Override + public Connection getConnectionWithSpecifiedDataNode( + DataNodeWrapper dataNode, String username, String password) throws SQLException { + return getConnection(username, password); + } + @Override public Connection getConnection(Constant.Version version, String username, String password) throws SQLException { diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 6947ea63c0c..3ac8eed6657 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -56,8 +56,16 @@ public interface BaseEnv { return getConnection("root", "root"); } + default Connection getConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode) + throws SQLException { + return getConnectionWithSpecifiedDataNode(dataNode, "root", "root"); + } + Connection getConnection(String username, String password) throws SQLException; + Connection getConnectionWithSpecifiedDataNode( + DataNodeWrapper dataNode, String username, String password) throws SQLException; + default Connection getConnection(Constant.Version version) throws SQLException { return getConnection(version, "root", "root"); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java index 2d1f17cb6c7..d7b8ca0e6db 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java @@ -43,11 +43,14 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.sql.Connection; +import java.sql.Statement; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @RunWith(IoTDBTestRunner.class) @@ -130,7 +133,7 @@ public class IoTDBClusterRegionLeaderBalancingIT { } @Test - public void testMCFLeaderDistribution() throws Exception { + public void testMCFLeaderDistributionWithUnknownStatus() throws Exception { final int retryNum = 50; TSStatus status; @@ -234,4 +237,112 @@ public class IoTDBClusterRegionLeaderBalancingIT { Assert.assertTrue(isDistributionBalanced); } } + + @Test + public void testMCFLeaderDistributionWithReadOnlyStatus() throws Exception { + final int retryNum = 50; + + TSStatus status; + final int storageGroupNum = 3; + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + for (int i = 0; i < storageGroupNum; i++) { + // Set StorageGroups + status = client.setDatabase(new TDatabaseSchema(sg + i)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // Create a DataRegionGroup for each StorageGroup + Map<TSeriesPartitionSlot, TTimeSlotList> seriesSlotMap = new HashMap<>(); + seriesSlotMap.put( + new TSeriesPartitionSlot(1), + new TTimeSlotList() + .setTimePartitionSlots(Collections.singletonList(new TTimePartitionSlot(100)))); + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> sgSlotsMap = new HashMap<>(); + sgSlotsMap.put(sg + i, seriesSlotMap); + TDataPartitionTableResp dataPartitionTableResp = + client.getOrCreateDataPartitionTable(new TDataPartitionReq(sgSlotsMap)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + } + + // Check leader distribution + Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>(); + TShowRegionResp showRegionResp; + boolean isDistributionBalanced = false; + for (int retry = 0; retry < retryNum; retry++) { + leaderCounter.clear(); + showRegionResp = client.showRegion(new TShowRegionReq()); + showRegionResp + .getRegionInfoList() + .forEach( + regionInfo -> { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) { + leaderCounter + .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0)) + .getAndIncrement(); + } + }); + + // All DataNodes have Region-leader + isDistributionBalanced = leaderCounter.size() == testDataNodeNum; + // Each DataNode has exactly 1 Region-leader + for (AtomicInteger leaderCount : leaderCounter.values()) { + if (leaderCount.get() != 1) { + isDistributionBalanced = false; + } + } + + if (isDistributionBalanced) { + break; + } else { + TimeUnit.SECONDS.sleep(1); + } + } + Assert.assertTrue(isDistributionBalanced); + } + + try (Connection connection = + EnvFactory.getEnv() + .getConnectionWithSpecifiedDataNode(EnvFactory.getEnv().getDataNodeWrapper(0)); + Statement statement = connection.createStatement()) { + statement.execute("SET SYSTEM TO READONLY ON LOCAL"); + } + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + // Make sure there exist one ReadOnly DataNode + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(0)), + Collections.singletonList(NodeStatus.ReadOnly)); + + // Check leader distribution + TShowRegionResp showRegionResp; + AtomicBoolean isDistributionBalanced = new AtomicBoolean(); + for (int retry = 0; retry < retryNum; retry++) { + isDistributionBalanced.set(true); + showRegionResp = client.showRegion(new TShowRegionReq()); + showRegionResp + .getRegionInfoList() + .forEach( + regionInfo -> { + if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType()) + && NodeStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) { + // ReadOnly DataNode couldn't have Region-leader + isDistributionBalanced.set(false); + } + }); + + if (isDistributionBalanced.get()) { + break; + } else { + TimeUnit.SECONDS.sleep(1); + } + } + Assert.assertTrue(isDistributionBalanced.get()); + } + } }
