This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.2-hotfix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1a0fdb02709dded9a13c1780ccba0acdcd871350 Author: Yongzao <[email protected]> AuthorDate: Tue May 13 17:17:04 2025 +0800 Use reference time position for PartitionTableAutoCleaner (cherry picked from commit d4818124bc1a6bd5ebf932396458ff608122ce01) --- .../partition/IoTDBPartitionTableAutoCleanIT.java | 7 +- .../IoTDBPartitionTableAutoCleanUSIT.java | 185 +++++++++++++++++++++ .../procedure/PartitionTableAutoCleaner.java | 23 ++- .../iotdb/commons/utils/TimePartitionUtils.java | 13 -- 4 files changed, 210 insertions(+), 18 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanIT.java index 6cad5508316..1fcb7eee08b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanIT.java @@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.it.partition; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; -import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.it.env.EnvFactory; @@ -50,7 +49,11 @@ public class IoTDBPartitionTableAutoCleanIT { private static final long TEST_TTL_CHECK_INTERVAL = 5_000; private static final TTimePartitionSlot TEST_CURRENT_TIME_SLOT = - TimePartitionUtils.getCurrentTimePartitionSlot(); + new TTimePartitionSlot() + .setStartTime( + System.currentTimeMillis() + / TEST_TIME_PARTITION_INTERVAL + * TEST_TIME_PARTITION_INTERVAL); private static final long TEST_TTL = 7 * TEST_TIME_PARTITION_INTERVAL; @Before diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanUSIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanUSIT.java new file mode 100644 index 00000000000..69ab3a03d28 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanUSIT.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.partition; + +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBPartitionTableAutoCleanUSIT { + + private static final String TREE_DATABASE_PREFIX = "root.db.g_"; + private static final String TABLE_DATABASE_PREFIX = "database_"; + + private static final int TEST_REPLICATION_FACTOR = 1; + private static final long TEST_TIME_PARTITION_INTERVAL_IN_MS = 604800_000; + private static final long TEST_TTL_CHECK_INTERVAL = 5_000; + + private static final TTimePartitionSlot TEST_CURRENT_TIME_SLOT = + new TTimePartitionSlot() + .setStartTime( + System.currentTimeMillis() + * 1000L + / TEST_TIME_PARTITION_INTERVAL_IN_MS + * TEST_TIME_PARTITION_INTERVAL_IN_MS); + private static final long TEST_TTL_IN_MS = 7 * TEST_TIME_PARTITION_INTERVAL_IN_MS; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setSchemaReplicationFactor(TEST_REPLICATION_FACTOR) + .setDataReplicationFactor(TEST_REPLICATION_FACTOR) + .setTimePartitionInterval(TEST_TIME_PARTITION_INTERVAL_IN_MS) + .setTTLCheckInterval(TEST_TTL_CHECK_INTERVAL) + // Note that the time precision of IoTDB is us in this IT + .setTimestampPrecision("us"); + + // Init 1C1D environment + EnvFactory.getEnv().initClusterEnvironment(1, 1); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testAutoCleanPartitionTableForTreeModel() throws Exception { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + // Create databases and insert test data + for (int i = 0; i < 3; i++) { + String databaseName = String.format("%s%d", TREE_DATABASE_PREFIX, i); + statement.execute(String.format("CREATE DATABASE %s", databaseName)); + statement.execute( + String.format( + "CREATE TIMESERIES %s.s WITH DATATYPE=INT64,ENCODING=PLAIN", databaseName)); + // Insert expired data + statement.execute( + String.format( + "INSERT INTO %s(timestamp, s) VALUES (%d, %d)", + databaseName, TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL_IN_MS * 2000, -1)); + // Insert existed data + statement.execute( + String.format( + "INSERT INTO %s(timestamp, s) VALUES (%d, %d)", + databaseName, TEST_CURRENT_TIME_SLOT.getStartTime(), 1)); + } + // Let db0.TTL > device.TTL, the valid TTL should be the bigger one + statement.execute(String.format("SET TTL TO %s0 %d", TREE_DATABASE_PREFIX, TEST_TTL_IN_MS)); + statement.execute(String.format("SET TTL TO %s0.s %d", TREE_DATABASE_PREFIX, 10)); + // Let db1.TTL < device.TTL, the valid TTL should be the bigger one + statement.execute(String.format("SET TTL TO %s1 %d", TREE_DATABASE_PREFIX, 10)); + statement.execute(String.format("SET TTL TO %s1.s %d", TREE_DATABASE_PREFIX, TEST_TTL_IN_MS)); + // Set TTL to path db2.** + statement.execute( + String.format("SET TTL TO %s2.** %d", TREE_DATABASE_PREFIX, TEST_TTL_IN_MS)); + } + TDataPartitionReq req = new TDataPartitionReq(); + for (int i = 0; i < 3; i++) { + req.putToPartitionSlotsMap(String.format("%s%d", TREE_DATABASE_PREFIX, i), new TreeMap<>()); + } + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + for (int retry = 0; retry < 120; retry++) { + boolean partitionTableAutoCleaned = true; + TDataPartitionTableResp resp = client.getDataPartitionTable(req); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) { + partitionTableAutoCleaned = + resp.getDataPartitionTable().entrySet().stream() + .flatMap(e1 -> e1.getValue().entrySet().stream()) + .allMatch(e2 -> e2.getValue().size() == 1); + } + if (partitionTableAutoCleaned) { + return; + } + TimeUnit.SECONDS.sleep(1); + } + } + Assert.fail("The PartitionTable in the ConfigNode is not auto cleaned!"); + } + + @Test + public void testAutoCleanPartitionTableForTableModel() throws Exception { + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + // Create databases and insert test data + String databaseName = TABLE_DATABASE_PREFIX; + statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName)); + statement.execute(String.format("USE %s", databaseName)); + statement.execute("CREATE TABLE tb (time TIMESTAMP TIME, s int64 FIELD)"); + // Insert expired data + statement.execute( + String.format( + "INSERT INTO tb(time, s) VALUES (%d, %d)", + TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL_IN_MS * 2000, -1)); + // Insert existed data + statement.execute( + String.format( + "INSERT INTO tb(time, s) VALUES (%d, %d)", TEST_CURRENT_TIME_SLOT.getStartTime(), 1)); + statement.execute(String.format("USE %s", TABLE_DATABASE_PREFIX)); + statement.execute(String.format("ALTER TABLE tb SET PROPERTIES TTL=%d", TEST_TTL_IN_MS)); + } + + TDataPartitionReq req = new TDataPartitionReq(); + req.putToPartitionSlotsMap(TABLE_DATABASE_PREFIX, new TreeMap<>()); + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + for (int retry = 0; retry < 120; retry++) { + boolean partitionTableAutoCleaned = true; + TDataPartitionTableResp resp = client.getDataPartitionTable(req); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) { + partitionTableAutoCleaned = + resp.getDataPartitionTable().entrySet().stream() + .flatMap(e1 -> e1.getValue().entrySet().stream()) + .allMatch(e2 -> e2.getValue().size() == 1); + } + if (partitionTableAutoCleaned) { + return; + } + TimeUnit.SECONDS.sleep(1); + } + } + Assert.fail("The PartitionTable in the ConfigNode is not auto cleaned!"); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java index 3c9690d3e29..85771f90804 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.utils.PathUtils; -import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.consensus.exception.ConsensusException; @@ -43,6 +42,10 @@ public class PartitionTableAutoCleaner<Env> extends InternalProcedure<Env> { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionTableAutoCleaner.class); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + + private static final String timestampPrecision = + CommonDescriptor.getInstance().getConfig().getTimestampPrecision(); + private final ConfigManager configManager; public PartitionTableAutoCleaner(ConfigManager configManager) { @@ -75,8 +78,7 @@ public class PartitionTableAutoCleaner<Env> extends InternalProcedure<Env> { "[PartitionTableCleaner] Periodically activate PartitionTableAutoCleaner for: {}", databaseTTLMap); // Only clean the partition table when necessary - TTimePartitionSlot currentTimePartitionSlot = - TimePartitionUtils.getCurrentTimePartitionSlot(); + TTimePartitionSlot currentTimePartitionSlot = getCurrentTimePartitionSlot(); try { configManager .getConsensusManager() @@ -86,4 +88,19 @@ public class PartitionTableAutoCleaner<Env> extends InternalProcedure<Env> { } } } + + /** + * @return The time partition slot corresponding to current timestamp. Note that we do not shift + * the start time to the correct starting point, since this interface only constructs a time + * reference position for the partition table cleaner. + */ + private static TTimePartitionSlot getCurrentTimePartitionSlot() { + if ("ms".equals(timestampPrecision)) { + return new TTimePartitionSlot(System.currentTimeMillis()); + } else if ("us".equals(timestampPrecision)) { + return new TTimePartitionSlot(System.currentTimeMillis() * 1000); + } else { + return new TTimePartitionSlot(System.currentTimeMillis() * 1000_000); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java index 058f7433929..eb53cdb2798 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java @@ -34,9 +34,6 @@ public class TimePartitionUtils { private static long timePartitionOrigin = CommonDescriptor.getInstance().getConfig().getTimePartitionOrigin(); - private static String timestampPrecision = - CommonDescriptor.getInstance().getConfig().getTimestampPrecision(); - /** Time range for dividing database, the time unit is the same with IoTDB's TimestampPrecision */ private static long timePartitionInterval = CommonDescriptor.getInstance().getConfig().getTimePartitionInterval(); @@ -74,16 +71,6 @@ public class TimePartitionUtils { } } - public static TTimePartitionSlot getCurrentTimePartitionSlot() { - if ("ms".equals(timestampPrecision)) { - return getTimePartitionSlot(System.currentTimeMillis()); - } else if ("us".equals(timestampPrecision)) { - return getTimePartitionSlot(System.currentTimeMillis() * 1000); - } else { - return getTimePartitionSlot(System.currentTimeMillis() * 1000_000); - } - } - public static TTimePartitionSlot getTimePartitionSlot(long time) { TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(); timePartitionSlot.setStartTime(getTimePartitionLowerBound(time));
