This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Fix-Partition-Cleaner-IT in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 93f65e3716c73b96412ce352f8832e72f98e31ab Author: YongzaoDan <[email protected]> AuthorDate: Mon Feb 24 14:33:06 2025 +0800 Finished --- .../persistence/partition/DatabasePartitionTable.java | 14 +++++++++++++- .../confignode/procedure/PartitionTableAutoCleaner.java | 3 +++ .../apache/iotdb/commons/partition/DataPartitionTable.java | 10 ++++++++-- .../iotdb/commons/partition/SeriesPartitionTable.java | 12 +++++++++++- 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index fda7e291315..6544f1a7c7c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -617,7 +617,19 @@ public class DatabasePartitionTable { * @param currentTimeSlot The current TimeSlot */ public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) { - dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot); + long[] removedTimePartitionSlots = + dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot).stream() + .map(TTimePartitionSlot::getStartTime) + .collect(Collectors.toList()) + .stream() + .mapToLong(Long::longValue) + .toArray(); + if (removedTimePartitionSlots.length > 0) { + LOGGER.info( + "[PartitionTableCleaner] The TimePartitions: {} are removed from Database: {}", + removedTimePartitionSlots, + databaseName); + } } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java index 4363610f7ca..8466c06b1a7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java @@ -46,6 +46,9 @@ public class PartitionTableAutoCleaner<Env> extends InternalProcedure<Env> { public PartitionTableAutoCleaner(ConfigManager configManager) { super(COMMON_CONFIG.getTTLCheckInterval()); this.configManager = configManager; + LOGGER.info( + "[PartitionTableCleaner] The PartitionTableAutoCleaner is started with cycle={}ms", + COMMON_CONFIG.getTTLCheckInterval()); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 64e1233daec..91346f0c69c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -35,9 +35,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -270,10 +272,14 @@ public class DataPartitionTable { * @param TTL The Time To Live * @param currentTimeSlot The current TimeSlot */ - public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) { + public Set<TTimePartitionSlot> autoCleanPartitionTable( + long TTL, TTimePartitionSlot currentTimeSlot) { + Set<TTimePartitionSlot> removedTimePartitionSlots = new HashSet<>(); dataPartitionMap.forEach( (seriesPartitionSlot, seriesPartitionTable) -> - seriesPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot)); + removedTimePartitionSlots.addAll( + seriesPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot))); + return removedTimePartitionSlots; } public void serialize(OutputStream outputStream, TProtocol protocol) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index 579d2fa99ef..450dc7db92a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -245,10 +246,19 @@ public class SeriesPartitionTable { * @param TTL The Time To Live * @param currentTimeSlot The current TimeSlot */ - public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) { + public List<TTimePartitionSlot> autoCleanPartitionTable( + long TTL, TTimePartitionSlot currentTimeSlot) { + List<TTimePartitionSlot> removedTimePartitions = new ArrayList<>(); + seriesPartitionMap.forEach( + (timePartitionSlot, consensusGroupIds) -> { + if (timePartitionSlot.getStartTime() + TTL < currentTimeSlot.getStartTime()) { + removedTimePartitions.add(timePartitionSlot); + } + }); seriesPartitionMap .entrySet() .removeIf(entry -> entry.getKey().getStartTime() + TTL < currentTimeSlot.getStartTime()); + return removedTimePartitions; } public void serialize(OutputStream outputStream, TProtocol protocol)
