This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch Computing-resource-balancing_cp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit deecdbd9292d199c0917c94928923dbda3c4e36a Author: YongzaoDan <[email protected]> AuthorDate: Sat Jul 22 19:04:06 2023 +0800 Pass IT --- .../iotdb/it/env/cluster/MppCommonConfig.java | 14 +- .../it/env/cluster/MppSharedCommonConfig.java | 15 +- .../iotdb/it/env/remote/RemoteCommonConfig.java | 12 +- .../org/apache/iotdb/itbase/env/CommonConfig.java | 4 +- .../partition/IoTDBPartitionInheritPolicyIT.java | 283 ++++++++++++++------- .../confignode/it/utils/ConfigNodeTestUtils.java | 117 +++++++++ .../db/it/last/IoTDBLastQueryLastCacheIT.java | 3 +- .../manager/load/balancer/PartitionBalancer.java | 22 +- .../load/balancer/partition/DataAllotTable.java | 68 ++--- .../manager/partition/PartitionManager.java | 7 +- .../partition/DatabasePartitionTable.java | 8 +- .../persistence/partition/PartitionInfo.java | 7 +- .../balancer/partition/DataAllotTableTest.java | 43 +++- .../commons/partition/DataPartitionEntry.java | 87 +++++++ .../commons/partition/DataPartitionTable.java | 10 +- .../commons/partition/SeriesPartitionTable.java | 12 +- .../commons/partition/DataPartitionEntryTest.java | 58 +++++ 17 files changed, 579 insertions(+), 191 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java index 6c01de2e787..4dbc3453fb8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java @@ -199,14 +199,6 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } - @Override - public CommonConfig setEnableDataPartitionInheritPolicy( - boolean enableDataPartitionInheritPolicy) { - setProperty( - "enable_data_partition_inherit_policy", String.valueOf(enableDataPartitionInheritPolicy)); - return this; - } - @Override public CommonConfig setDataReplicationFactor(int dataReplicationFactor) { setProperty("data_replication_factor", String.valueOf(dataReplicationFactor)); @@ -359,4 +351,10 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { setProperty("database_limit_threshold", String.valueOf(databaseLimitThreshold)); return this; } + + @Override + public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) { + setProperty("data_region_per_data_node", String.valueOf(dataRegionPerDataNode)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java index 7af4f01fa63..b362a02d97b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java @@ -172,14 +172,6 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } - @Override - public CommonConfig setEnableDataPartitionInheritPolicy( - boolean enableDataPartitionInheritPolicy) { - cnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy); - dnConfig.setEnableDataPartitionInheritPolicy(enableDataPartitionInheritPolicy); - return this; - } - @Override public CommonConfig setSchemaRegionGroupExtensionPolicy(String schemaRegionGroupExtensionPolicy) { cnConfig.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy); @@ -372,4 +364,11 @@ public class MppSharedCommonConfig implements CommonConfig { cnConfig.setDatabaseLimitThreshold(databaseLimitThreshold); return this; } + + @Override + public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) { + dnConfig.setDataRegionPerDataNode(dataRegionPerDataNode); + cnConfig.setDataRegionPerDataNode(dataRegionPerDataNode); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java index a578a6f4cf5..e181d5e1f73 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.it.env.remote; import org.apache.iotdb.itbase.env.CommonConfig; @@ -122,12 +123,6 @@ public class RemoteCommonConfig implements CommonConfig { return this; } - @Override - public CommonConfig setEnableDataPartitionInheritPolicy( - boolean enableDataPartitionInheritPolicy) { - return this; - } - @Override public CommonConfig setSchemaRegionGroupExtensionPolicy(String schemaRegionGroupExtensionPolicy) { return this; @@ -266,4 +261,9 @@ public class RemoteCommonConfig implements CommonConfig { public CommonConfig setDatabaseLimitThreshold(long databaseLimitThreshold) { return this; } + + @Override + public CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 2a08304221d..08161c36bc3 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -62,8 +62,6 @@ public interface CommonConfig { CommonConfig setDataRegionConsensusProtocolClass(String dataRegionConsensusProtocolClass); - CommonConfig setEnableDataPartitionInheritPolicy(boolean enableDataPartitionInheritPolicy); - CommonConfig setSchemaRegionGroupExtensionPolicy(String schemaRegionGroupExtensionPolicy); CommonConfig setDefaultSchemaRegionGroupNumPerDatabase(int schemaRegionGroupPerDatabase); @@ -119,4 +117,6 @@ public interface CommonConfig { CommonConfig setSortBufferSize(long sortBufferSize); CommonConfig setMaxTsBlockSizeInByte(long maxTsBlockSizeInByte); + + CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java index 8b49948f5f7..aecd22375f5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java @@ -16,18 +16,17 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.it.partition; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; -import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; -import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; -import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -40,27 +39,26 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.HashSet; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; @RunWith(IoTDBTestRunner.class) @Category({ClusterIT.class}) public class IoTDBPartitionInheritPolicyIT { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBPartitionInheritPolicyIT.class); - private static final boolean testEnableDataPartitionInheritPolicy = true; private static final String testDataRegionConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; - private static final int testReplicationFactor = 3; + private static final int testReplicationFactor = 1; + private static final int testSeriesSlotNum = 1000; private static final long testTimePartitionInterval = 604800000; + private static final double testDataRegionPerDataNode = 5.0; - private static final String sg = "root.sg"; - private static final int storageGroupNum = 2; - private static final int testSeriesPartitionSlotNum = 100; - private static final int seriesPartitionBatchSize = 10; + private static final String database = "root.database"; + private static final int seriesPartitionSlotBatchSize = 100; private static final int testTimePartitionSlotsNum = 100; private static final int timePartitionBatchSize = 10; @@ -70,21 +68,19 @@ public class IoTDBPartitionInheritPolicyIT { .getConfig() .getCommonConfig() .setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass) - .setEnableDataPartitionInheritPolicy(testEnableDataPartitionInheritPolicy) .setDataReplicationFactor(testReplicationFactor) .setTimePartitionInterval(testTimePartitionInterval) - .setSeriesSlotNum(testSeriesPartitionSlotNum * 10); + .setSeriesSlotNum(testSeriesSlotNum) + .setDataRegionPerDataNode(testDataRegionPerDataNode); - // Init 1C3D environment - EnvFactory.getEnv().initClusterEnvironment(1, 3); + // Init 1C1D environment + EnvFactory.getEnv().initClusterEnvironment(1, 1); - // Set StorageGroups + // Set Database try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - for (int i = 0; i < storageGroupNum; i++) { - TSStatus status = client.setDatabase(new TDatabaseSchema(sg + i)); - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - } + TSStatus status = client.setDatabase(new TDatabaseSchema(database)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); } } @@ -95,76 +91,185 @@ public class IoTDBPartitionInheritPolicyIT { @Test public void testDataPartitionInheritPolicy() throws Exception { + final long baseStartTime = 1000; + Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new ConcurrentHashMap<>(); - try (SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - TDataPartitionReq dataPartitionReq = new TDataPartitionReq(); - TDataPartitionTableResp dataPartitionTableResp; - Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap; - - for (int i = 0; i < storageGroupNum; i++) { - String storageGroup = sg + i; - for (int j = 0; j < testSeriesPartitionSlotNum; j += seriesPartitionBatchSize) { - // Test inherit predecessor or successor - boolean isAscending = (j / 10) % 2 == 0; - int step = isAscending ? timePartitionBatchSize : -timePartitionBatchSize; - int k = isAscending ? 0 : testTimePartitionSlotsNum - timePartitionBatchSize; - while (0 <= k && k < testTimePartitionSlotsNum) { - partitionSlotsMap = - ConfigNodeTestUtils.constructPartitionSlotsMap( - storageGroup, - j, - j + seriesPartitionBatchSize, - k, - k + timePartitionBatchSize, - testTimePartitionInterval); - // Let ConfigNode create DataPartition - dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap); - for (int retry = 0; retry < 5; retry++) { - // Build new Client since it's unstable - try (SyncConfigNodeIServiceClient configNodeClient = - (SyncConfigNodeIServiceClient) - EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - dataPartitionTableResp = - configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq); - if (dataPartitionTableResp != null - && dataPartitionTableResp.getStatus().getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - ConfigNodeTestUtils.checkDataPartitionTable( - storageGroup, - j, - j + seriesPartitionBatchSize, - k, - k + timePartitionBatchSize, - testTimePartitionInterval, - configNodeClient - .getDataPartitionTable(dataPartitionReq) - .getDataPartitionTable()); - break; - } - } catch (Exception e) { - // Retry sometimes in order to avoid request timeout - LOGGER.error(e.getMessage()); - TimeUnit.SECONDS.sleep(1); + // Test1: divide and inherit DataPartitions from scratch + // Notice: create all DataRegionGroups as soon as possible + // Otherwise, the allocation might be slightly unbalanced + ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( + database, 0, 10, baseStartTime, baseStartTime + 1, testTimePartitionInterval); + ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( + database, + 10, + testSeriesSlotNum, + baseStartTime, + baseStartTime + 1, + testTimePartitionInterval); + + for (long timePartitionSlot = baseStartTime + 1; + timePartitionSlot < baseStartTime + testTimePartitionSlotsNum; + timePartitionSlot++) { + for (int seriesPartitionSlot = 0; + seriesPartitionSlot < testSeriesSlotNum; + seriesPartitionSlot += seriesPartitionSlotBatchSize) { + ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( + database, + seriesPartitionSlot, + seriesPartitionSlot + seriesPartitionSlotBatchSize, + timePartitionSlot, + timePartitionSlot + 1, + testTimePartitionInterval); + } + } + + int mu = (int) (testSeriesSlotNum / testDataRegionPerDataNode); + TDataPartitionTableResp dataPartitionTableResp = + ConfigNodeTestUtils.getDataPartitionWithRetry( + database, + 0, + testSeriesSlotNum, + baseStartTime, + baseStartTime + testTimePartitionSlotsNum, + testTimePartitionInterval); + Assert.assertNotNull(dataPartitionTableResp); + + // All DataRegionGroups divide all SeriesSlots evenly + final int expectedPartitionNum1 = mu * testTimePartitionSlotsNum; + Map<TConsensusGroupId, Integer> counter = + ConfigNodeTestUtils.countDataPartition( + dataPartitionTableResp.getDataPartitionTable().get(database)); + counter.forEach((groupId, num) -> Assert.assertEquals(expectedPartitionNum1, num.intValue())); + + // Test DataPartition inherit policy + dataPartitionTableResp + .getDataPartitionTable() + .get(database) + .forEach( + ((seriesPartitionSlot, timePartitionSlotMap) -> { + // All Timeslots belonging to the same SeriesSlot are allocated to the same + // DataRegionGroup + TConsensusGroupId groupId = + timePartitionSlotMap + .get(new TTimePartitionSlot(baseStartTime * testTimePartitionInterval)) + .get(0); + timePartitionSlotMap.forEach( + (timePartitionSlot, groupIdList) -> + Assert.assertEquals(groupId, groupIdList.get(0))); + dataAllotTable1.put(seriesPartitionSlot, groupId); + })); + + // Register a new DataNode to extend DataRegionGroups + EnvFactory.getEnv().registerNewDataNode(true); + + // Test2: divide and inherit DataPartitions after extension + mu = (int) (testSeriesSlotNum / (testDataRegionPerDataNode * 2)); + dataPartitionTableResp = + ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( + database, + 0, + testSeriesSlotNum, + baseStartTime + testTimePartitionSlotsNum, + baseStartTime + testTimePartitionSlotsNum + timePartitionBatchSize, + testTimePartitionInterval); + Assert.assertNotNull(dataPartitionTableResp); + + // All DataRegionGroups divide all SeriesSlots evenly + counter = + ConfigNodeTestUtils.countDataPartition( + dataPartitionTableResp.getDataPartitionTable().get(database)); + final int expectedPartitionNum2 = mu * timePartitionBatchSize; + counter.forEach((groupId, num) -> Assert.assertEquals(expectedPartitionNum2, num.intValue())); + + // Test DataPartition inherit policy + Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable2 = new ConcurrentHashMap<>(); + dataPartitionTableResp + .getDataPartitionTable() + .get(database) + .forEach( + ((seriesPartitionSlot, timePartitionSlotMap) -> { + // All Timeslots belonging to the same SeriesSlot are allocated to the same + // DataRegionGroup + TConsensusGroupId groupId = + timePartitionSlotMap + .get( + new TTimePartitionSlot( + (baseStartTime + testTimePartitionSlotsNum) + * testTimePartitionInterval)) + .get(0); + timePartitionSlotMap.forEach( + (timePartitionSlot, groupIdList) -> + Assert.assertEquals(groupId, groupIdList.get(0))); + + if (dataAllotTable1.containsValue(groupId)) { + // The DataRegionGroup has been inherited + Assert.assertTrue(dataAllotTable1.containsKey(seriesPartitionSlot)); + Assert.assertEquals(dataAllotTable1.get(seriesPartitionSlot), groupId); } - } - k += step; - } - } + dataAllotTable2.put(seriesPartitionSlot, groupId); + })); + + // Test3: historical DataPartitions will inherit successor + Random random = new Random(); + Set<Integer> allocatedSlots = new HashSet<>(); + for (int i = 0; i < 10; i++) { + int slot = random.nextInt(testSeriesSlotNum); + while (allocatedSlots.contains(slot)) { + slot = random.nextInt(testSeriesSlotNum); + } + allocatedSlots.add(slot); + dataPartitionTableResp = + ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( + database, + slot, + slot + 1, + baseStartTime - 1, + baseStartTime, + testTimePartitionInterval); + Assert.assertNotNull(dataPartitionTableResp); + + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(slot); + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot((baseStartTime - 1) * testTimePartitionInterval); + Assert.assertEquals( + dataAllotTable1.get(seriesPartitionSlot), + dataPartitionTableResp + .getDataPartitionTable() + .get(database) + .get(seriesPartitionSlot) + .get(timePartitionSlot) + .get(0)); + } + + // Test4: future DataPartitions will inherit predecessor + allocatedSlots.clear(); + for (int i = 0; i < 10; i++) { + int slot = random.nextInt(testSeriesSlotNum); + while (allocatedSlots.contains(slot)) { + slot = random.nextInt(testSeriesSlotNum); } + allocatedSlots.add(slot); + dataPartitionTableResp = + ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( + database, + slot, + slot + 1, + baseStartTime + 999, + baseStartTime + 1000, + testTimePartitionInterval); + Assert.assertNotNull(dataPartitionTableResp); - // Test DataPartition inherit policy - TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); - showRegionResp - .getRegionInfoList() - .forEach( - regionInfo -> { - // All Timeslots belonging to the same SeriesSlot are allocated to the same - // DataRegionGroup - Assert.assertEquals( - regionInfo.getSeriesSlots() * testTimePartitionSlotsNum, - regionInfo.getTimeSlots()); - }); + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(slot); + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot((baseStartTime + 999) * testTimePartitionInterval); + Assert.assertEquals( + dataAllotTable2.get(seriesPartitionSlot), + dataPartitionTableResp + .getDataPartitionTable() + .get(database) + .get(seriesPartitionSlot) + .get(timePartitionSlot) + .get(0)); } } } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java index 6f20a25ba79..8f5b46288fb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeResource; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; @@ -34,12 +35,18 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper; import org.apache.iotdb.it.env.cluster.DataNodeWrapper; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -47,11 +54,17 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.junit.Assert.assertTrue; public class ConfigNodeTestUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeTestUtils.class); + public static void checkNodeConfig( List<TConfigNodeLocation> configNodeList, List<TDataNodeLocation> dataNodeList, @@ -169,6 +182,110 @@ public class ConfigNodeTestUtils { } } + public static TDataPartitionTableResp getDataPartitionWithRetry( + String database, + int seriesSlotStart, + int seriesSlotEnd, + long timeSlotStart, + long timeSlotEnd, + long timePartitionInterval) + throws InterruptedException { + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + database, + seriesSlotStart, + seriesSlotEnd, + timeSlotStart, + timeSlotEnd, + timePartitionInterval); + TDataPartitionTableResp dataPartitionTableResp; + TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); + + for (int retry = 0; retry < 5; retry++) { + // Build new Client since it's unstable + try (SyncConfigNodeIServiceClient configNodeClient = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + dataPartitionTableResp = configNodeClient.getDataPartitionTable(dataPartitionReq); + if (dataPartitionTableResp != null + && dataPartitionTableResp.getStatus().getCode() + == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return dataPartitionTableResp; + } + } catch (Exception e) { + // Retry sometimes in order to avoid request timeout + LOGGER.error(e.getMessage()); + TimeUnit.SECONDS.sleep(1); + } + } + Assert.fail("Failed to create DataPartition"); + return null; + } + + public static TDataPartitionTableResp getOrCreateDataPartitionWithRetry( + String database, + int seriesSlotStart, + int seriesSlotEnd, + long timeSlotStart, + long timeSlotEnd, + long timePartitionInterval) + throws InterruptedException { + + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + database, + seriesSlotStart, + seriesSlotEnd, + timeSlotStart, + timeSlotEnd, + timePartitionInterval); + TDataPartitionTableResp dataPartitionTableResp; + TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); + + for (int retry = 0; retry < 5; retry++) { + // Build new Client since it's unstable + try (SyncConfigNodeIServiceClient configNodeClient = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq); + if (dataPartitionTableResp != null + && dataPartitionTableResp.getStatus().getCode() + == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + ConfigNodeTestUtils.checkDataPartitionTable( + database, + seriesSlotStart, + seriesSlotEnd, + timeSlotStart, + timeSlotEnd, + timePartitionInterval, + configNodeClient.getDataPartitionTable(dataPartitionReq).getDataPartitionTable()); + return dataPartitionTableResp; + } + } catch (Exception e) { + // Retry sometimes in order to avoid request timeout + LOGGER.error(e.getMessage()); + TimeUnit.SECONDS.sleep(1); + } + } + Assert.fail("Failed to create DataPartition"); + return null; + } + + public static Map<TConsensusGroupId, Integer> countDataPartition( + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> + dataPartitionMap) { + Map<TConsensusGroupId, AtomicInteger> counter = new ConcurrentHashMap<>(); + dataPartitionMap.forEach( + ((seriesPartitionSlot, timePartitionSlotMap) -> + timePartitionSlotMap.forEach( + ((timePartitionSlot, consensusGroupIds) -> + consensusGroupIds.forEach( + (consensusGroupId -> + counter + .computeIfAbsent(consensusGroupId, empty -> new AtomicInteger(0)) + .incrementAndGet())))))); + return counter.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get())); + } + public static TConfigNodeLocation generateTConfigNodeLocation( int nodeId, ConfigNodeWrapper configNodeWrapper) { return new TConfigNodeLocation( diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java index 33f9ef9e151..6d30db00085 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java @@ -119,7 +119,8 @@ public class IoTDBLastQueryLastCacheIT { "1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,", "1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,", }; - resultSetEqualTest("select last * from root.ln_1.tb_6141;", expectedHeader, retArray); + resultSetEqualTest( + "select last * from root.ln_1.tb_6141 order by timeseries;", expectedHeader, retArray); } @Test diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java index c0c617cd795..efd36a65382 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.SeriesPartitionTable; @@ -42,6 +43,7 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -242,28 +244,22 @@ public class PartitionBalancer { * @param database Database name */ public void updateDataAllotTable(String database) { - TTimePartitionSlot currentTimePartition = - dataAllotTableMap - .computeIfAbsent(database, empty -> new DataAllotTable()) - .getCurrentTimePartition(); - Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new ConcurrentHashMap<>(); + List<DataPartitionEntry> lastDataPartitions = new ArrayList<>(); for (int i = 0; i < SERIES_SLOT_NUM; i++) { TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); - Pair<TTimePartitionSlot, TConsensusGroupId> lastDataPartition = - getPartitionManager().getLastDataPartition(database, seriesPartitionSlot); - if (lastDataPartition != null - && currentTimePartition.compareTo(lastDataPartition.getLeft()) < 0) { - // Put all future DataPartitions into the allocatedTable - allocatedTable.put(seriesPartitionSlot, lastDataPartition.getRight()); + DataPartitionEntry lastDataPartition = + getPartitionManager().getLastDataPartitionEntry(database, seriesPartitionSlot); + if (lastDataPartition != null) { + lastDataPartitions.add(lastDataPartition); } } try { dataAllotTableMap - .get(database) + .computeIfAbsent(database, empty -> new DataAllotTable()) .updateDataAllotTable( getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion), - allocatedTable); + lastDataPartitions); } catch (DatabaseNotExistsException e) { LOGGER.error("Database {} not exists when updateDataAllotTable", database); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java index dec450d20d9..02797d45d8d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java @@ -22,15 +22,16 @@ package org.apache.iotdb.confignode.manager.load.balancer.partition; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.structure.BalanceTreeMap; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -72,50 +73,51 @@ public class DataAllotTable { * Update the DataAllotTable according to the current DataRegionGroups and future DataAllotTable. * * @param dataRegionGroups the current DataRegionGroups - * @param allocatedTable the future DataAllotTable, i.e. some SeriesSlots have already allocated + * @param lastDataPartitions the last DataPartition of each SeriesPartitionSlot */ public void updateDataAllotTable( - List<TConsensusGroupId> dataRegionGroups, - Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable) { + List<TConsensusGroupId> dataRegionGroups, List<DataPartitionEntry> lastDataPartitions) { dataAllotTableLock.writeLock().lock(); try { // mu is the average number of slots allocated to each regionGroup int mu = SERIES_SLOT_NUM / dataRegionGroups.size(); - // Decide all SeriesSlot randomly - List<TSeriesPartitionSlot> seriesSlotList = new ArrayList<>(); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - seriesSlotList.add(new TSeriesPartitionSlot(i)); - } - Collections.shuffle(seriesSlotList); // The counter will maintain the number of slots allocated to each regionGroup BalanceTreeMap<TConsensusGroupId, Integer> counter = new BalanceTreeMap<>(); - Map<TConsensusGroupId, AtomicInteger> regionSlotCounter = new HashMap<>(); - allocatedTable.forEach( - (seriesSlot, regionGroupId) -> - regionSlotCounter - .computeIfAbsent(regionGroupId, empty -> new AtomicInteger(0)) - .incrementAndGet()); - dataRegionGroups.forEach( - regionGroupId -> regionSlotCounter.putIfAbsent(regionGroupId, new AtomicInteger(0))); - regionSlotCounter.forEach( - (regionGroupId, slotNum) -> counter.put(regionGroupId, slotNum.get())); + dataRegionGroups.forEach(regionGroupId -> counter.put(regionGroupId, 0)); - Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new HashMap<>(); - for (TSeriesPartitionSlot seriesPartitionSlot : seriesSlotList) { - if (allocatedTable.containsKey(seriesPartitionSlot)) { - // If the SeriesSlot has already been allocated, keep the allocation - newAllotTable.put(seriesPartitionSlot, allocatedTable.get(seriesPartitionSlot)); - continue; + // Fill unallocated SeriesSlots + Set<TSeriesPartitionSlot> allocatedSeriesSlots = + lastDataPartitions.stream() + .map(DataPartitionEntry::getSeriesPartitionSlot) + .collect(Collectors.toSet()); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + if (!allocatedSeriesSlots.contains(seriesPartitionSlot)) { + lastDataPartitions.add( + new DataPartitionEntry( + seriesPartitionSlot, new TTimePartitionSlot(Long.MIN_VALUE), null)); } + } - TConsensusGroupId oldRegionGroupId = dataAllotMap.get(seriesPartitionSlot); - if (oldRegionGroupId != null - && counter.containsKey(oldRegionGroupId) - && counter.get(oldRegionGroupId) < mu) { - // Inherit the oldRegionGroupId when the slotNum of oldRegionGroupId is less than average - newAllotTable.put(seriesPartitionSlot, oldRegionGroupId); - counter.put(oldRegionGroupId, counter.get(oldRegionGroupId) + 1); + // The allocated DataPartitions are sorted as follows: + // 1. Descending order of TimePartitionSlot + // 2. Ascending order of random weight + Collections.sort(lastDataPartitions); + + Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new HashMap<>(); + // Enumerate all SeriesPartitionSlots in descending order of their TimePartitionSlot + for (DataPartitionEntry entry : lastDataPartitions) { + TSeriesPartitionSlot seriesPartitionSlot = entry.getSeriesPartitionSlot(); + TConsensusGroupId allocatedRegionGroupId = entry.getDataRegionGroup(); + if (allocatedRegionGroupId != null + // Inherit DataRegionGroup if it has been allocated in the future + && (entry.getTimePartitionSlot().getStartTime() + > currentTimePartition.get().getStartTime() + // Inherit DataRegionGroup when the slotNum of oldRegionGroupId is less than average + || counter.get(allocatedRegionGroupId) < mu)) { + newAllotTable.put(seriesPartitionSlot, allocatedRegionGroupId); + counter.put(allocatedRegionGroupId, counter.get(allocatedRegionGroupId) + 1); continue; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index a671150a308..65429f70ec4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; @@ -1311,12 +1312,12 @@ public class PartitionManager { * * @param database The specified Database * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartition, null if the Database doesn't exist or there are no + * @return The last DataPartitionEntry, null if the Database doesn't exist or there are no * DataPartitions yet */ - public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( + public DataPartitionEntry getLastDataPartitionEntry( String database, TSeriesPartitionSlot seriesPartitionSlot) { - return partitionInfo.getLastDataPartition(database, seriesPartitionSlot); + return partitionInfo.getLastDataPartitionEntry(database, seriesPartitionSlot); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index 9782356b595..a27168d8f8f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; @@ -585,12 +586,11 @@ public class DatabasePartitionTable { * Get the DataPartition with max TimePartition of the specified the SeriesPartitionSlot. * * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartition, null if there are no DataPartitions in the specified + * @return The last DataPartitionEntry, null if there are no DataPartitions in the specified * SeriesPartitionSlot */ - public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( - TSeriesPartitionSlot seriesPartitionSlot) { - return dataPartitionTable.getLastDataPartition(seriesPartitionSlot); + public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot seriesPartitionSlot) { + return dataPartitionTable.getLastDataPartitionEntry(seriesPartitionSlot); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index a0e75d29168..bfd509345eb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; @@ -835,13 +836,13 @@ public class PartitionInfo implements SnapshotProcessor { * * @param database The specified Database * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartition, null if the Database doesn't exist or there are no + * @return The last DataPartitionEntry, null if the Database doesn't exist or there are no * DataPartitions in the specified SeriesPartitionSlot */ - public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( + public DataPartitionEntry getLastDataPartitionEntry( String database, TSeriesPartitionSlot seriesPartitionSlot) { if (isDatabaseExisted(database)) { - return databasePartitionTables.get(database).getLastDataPartition(seriesPartitionSlot); + return databasePartitionTables.get(database).getLastDataPartitionEntry(seriesPartitionSlot); } return null; } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java index 45ee77cecb1..67ff0dc0099 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartitionEntry; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; @@ -31,9 +32,11 @@ import org.junit.Test; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; public class DataAllotTableTest { @@ -106,7 +109,7 @@ public class DataAllotTableTest { // Test 1: construct DataAllotTable from scratch TConsensusGroupId group1 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 1); dataRegionGroups.add(group1); - dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>()); + dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>()); for (int i = 0; i < SERIES_SLOT_NUM; i++) { TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); // All SeriesPartitionSlots belong to group1 @@ -117,7 +120,7 @@ public class DataAllotTableTest { Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new HashMap<>(); dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)); dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3)); - dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>()); + dataAllotTable.updateDataAllotTable(dataRegionGroups, new ArrayList<>()); int mu = SERIES_SLOT_NUM / 3; Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>(); for (int i = 0; i < SERIES_SLOT_NUM; i++) { @@ -135,19 +138,35 @@ public class DataAllotTableTest { dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4)); dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 5)); Random random = new Random(); - Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new HashMap<>(); + Set<TSeriesPartitionSlot> selectedSlots = new HashSet<>(); + List<DataPartitionEntry> lastDataPartitions = new ArrayList<>(); Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>(); for (int i = 0; i < 50; i++) { + // Randomly pre-allocate 50 SeriesPartitionSlots TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM)); - while (allocatedTable.containsKey(seriesPartitionSlot)) { + while (selectedSlots.contains(seriesPartitionSlot)) { seriesPartitionSlot = new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM)); } - allocatedTable.put( - seriesPartitionSlot, - new TConsensusGroupId(TConsensusGroupType.DataRegion, random.nextInt(2) + 4)); + selectedSlots.add(seriesPartitionSlot); + lastDataPartitions.add( + new DataPartitionEntry( + seriesPartitionSlot, + new TTimePartitionSlot(Long.MAX_VALUE), + new TConsensusGroupId(TConsensusGroupType.DataRegion, random.nextInt(2) + 4))); } - dataAllotTable.updateDataAllotTable(dataRegionGroups, allocatedTable); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + // Record the other allocation result + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + if (!selectedSlots.contains(seriesPartitionSlot)) { + lastDataPartitions.add( + new DataPartitionEntry( + seriesPartitionSlot, + new TTimePartitionSlot(Long.MIN_VALUE), + lastDataAllotTable.get(seriesPartitionSlot))); + } + } + dataAllotTable.updateDataAllotTable(dataRegionGroups, lastDataPartitions); mu = SERIES_SLOT_NUM / 5; counter.clear(); for (int i = 0; i < SERIES_SLOT_NUM; i++) { @@ -166,9 +185,11 @@ public class DataAllotTableTest { } // All SeriesPartitionSlots that have been allocated before should be allocated to the same // DataRegionGroup - allocatedTable.forEach( - (seriesPartitionSlot, groupId) -> - Assert.assertEquals(groupId, dataAllotTable.getRegionGroupId(seriesPartitionSlot))); + for (int i = 0; i < 50; i++) { + Assert.assertEquals( + lastDataPartitions.get(i).getDataRegionGroup(), + dataAllotTable.getRegionGroupId(lastDataPartitions.get(i).getSeriesPartitionSlot())); + } // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : unchangedSlots.entrySet()) { Assert.assertEquals(mu, counterEntry.getValue().get()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java new file mode 100644 index 00000000000..449f8e45163 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; + +import java.util.Objects; +import java.util.Random; + +public class DataPartitionEntry implements Comparable<DataPartitionEntry> { + + private final TSeriesPartitionSlot seriesPartitionSlot; + private final TTimePartitionSlot timePartitionSlot; + private final TConsensusGroupId dataRegionGroup; + private final int weight; + + public DataPartitionEntry( + TSeriesPartitionSlot seriesPartitionSlot, + TTimePartitionSlot timePartitionSlot, + TConsensusGroupId dataRegionGroup) { + this.seriesPartitionSlot = seriesPartitionSlot; + this.timePartitionSlot = timePartitionSlot; + this.dataRegionGroup = dataRegionGroup; + this.weight = new Random().nextInt(); + } + + public TSeriesPartitionSlot getSeriesPartitionSlot() { + return seriesPartitionSlot; + } + + public TTimePartitionSlot getTimePartitionSlot() { + return timePartitionSlot; + } + + public TConsensusGroupId getDataRegionGroup() { + return dataRegionGroup; + } + + @Override + public int compareTo(DataPartitionEntry o) { + // The timePartitionSlot will be in descending order + // After invoke Collections.sort() + if (!timePartitionSlot.equals(o.timePartitionSlot)) { + return -timePartitionSlot.compareTo(o.timePartitionSlot); + } + return Integer.compare(weight, o.weight); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DataPartitionEntry that = (DataPartitionEntry) o; + return weight == that.weight + && seriesPartitionSlot.equals(that.seriesPartitionSlot) + && timePartitionSlot.equals(that.timePartitionSlot) + && dataRegionGroup.equals(that.dataRegionGroup); + } + + @Override + public int hashCode() { + return Objects.hash(seriesPartitionSlot, timePartitionSlot, dataRegionGroup, weight); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 6c2108f5898..497f4085410 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; @@ -281,13 +280,14 @@ public class DataPartitionTable { * SeriesPartitionSlot. * * @param seriesPartitionSlot The specified SeriesPartitionSlot - * @return The last DataPartition, null if there are no DataPartitions in the specified + * @return The last DataPartitionEntry, null if there are no DataPartitions in the specified * SeriesPartitionSlot */ - public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( - TSeriesPartitionSlot seriesPartitionSlot) { + public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot seriesPartitionSlot) { if (dataPartitionMap.containsKey(seriesPartitionSlot)) { - return dataPartitionMap.get(seriesPartitionSlot).getLastDataPartition(); + return dataPartitionMap + .get(seriesPartitionSlot) + .getLastDataPartitionEntry(seriesPartitionSlot); } else { return null; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index d72157a5037..58f2ba52b9c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; @@ -247,16 +246,19 @@ public class SeriesPartitionTable { * Get the DataPartition with max TimePartition of the specified Database and the * SeriesPartitionSlot. * - * @return The last DataPartition, null if there are no DataPartitions + * @param seriesPartitionSlot The specified SeriesPartitionSlot + * @return The last DataPartitionEntry, null if there are no DataPartitions */ - public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition() { + public DataPartitionEntry getLastDataPartitionEntry(TSeriesPartitionSlot seriesPartitionSlot) { Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry = seriesPartitionMap.lastEntry(); if (lastEntry == null) { return null; } - return new Pair<>( - lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1)); + return new DataPartitionEntry( + seriesPartitionSlot, + lastEntry.getKey(), + lastEntry.getValue().get(lastEntry.getValue().size() - 1)); } /** diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java new file mode 100644 index 00000000000..c92d4564b76 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DataPartitionEntryTest { + + private static final int SERIES_SLOT_NUM = 1000; + private static final long TIME_PARTITION_INTERVAL = + CommonDescriptor.getInstance().getConfig().getTimePartitionInterval(); + + @Test + public void testOrder() { + List<DataPartitionEntry> entries = new ArrayList<>(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + entries.add( + new DataPartitionEntry( + new TSeriesPartitionSlot(i), + new TTimePartitionSlot(TIME_PARTITION_INTERVAL * i), + new TConsensusGroupId(TConsensusGroupType.DataRegion, i))); + } + + List<DataPartitionEntry> sortedEntries = new ArrayList<>(entries); + Collections.sort(sortedEntries); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + Assert.assertEquals(entries.get(SERIES_SLOT_NUM - i - 1), sortedEntries.get(i)); + } + } +}
