This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fa040c8813 [core] Refactor HistoryPartitionCluster to load less 
history partitions
fa040c8813 is described below

commit fa040c8813ab7dd59df77bed0a78ceb6910440f5
Author: JingsongLi <[email protected]>
AuthorDate: Tue Oct 28 21:48:59 2025 +0800

    [core] Refactor HistoryPartitionCluster to load less history partitions
---
 .../append/cluster/HistoryPartitionCluster.java    | 139 ++++++++++-----------
 .../append/cluster/IncrementalClusterManager.java  |  91 +++++---------
 .../paimon/operation/AbstractFileStoreScan.java    |   4 +-
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +-
 .../apache/paimon/operation/ManifestsReader.java   |   8 +-
 .../table/source/snapshot/SnapshotReader.java      |   2 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |   5 +-
 .../apache/paimon/table/system/AuditLogTable.java  |   5 +-
 .../cluster/HistoryPartitionClusterTest.java       |  57 +--------
 .../cluster/IncrementalClusterManagerTest.java     |   2 +-
 10 files changed, 110 insertions(+), 205 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
index cccecdd2be..83f92aa9c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
@@ -27,7 +27,6 @@ import org.apache.paimon.mergetree.LevelSortedRun;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 
 import org.slf4j.Logger;
@@ -39,14 +38,11 @@ import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels;
@@ -56,70 +52,86 @@ import static 
org.apache.paimon.append.cluster.IncrementalClusterManager.logForP
 public class HistoryPartitionCluster {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HistoryPartitionCluster.class);
-    private final InternalRowPartitionComputer partitionComputer;
 
     private final FileStoreTable table;
     private final IncrementalClusterStrategy incrementalClusterStrategy;
-    private final int maxLevel;
-
+    private final InternalRowPartitionComputer partitionComputer;
+    private final PartitionPredicate specifiedPartitions;
+    private final Duration historyPartitionIdleTime;
     private final int historyPartitionLimit;
-    @Nullable private final PartitionPredicate specifiedPartitions;
-    @Nullable private final Duration historyPartitionIdleTime;
-    @Nullable private final BiFilter<Integer, Integer> partitionLevelFilter;
+    private final int maxLevel;
 
     public HistoryPartitionCluster(
             FileStoreTable table,
             IncrementalClusterStrategy incrementalClusterStrategy,
             InternalRowPartitionComputer partitionComputer,
-            int maxLevel,
-            int historyPartitionLimit,
-            @Nullable PartitionPredicate specifiedPartitions,
-            @Nullable Duration historyPartitionIdleTime) {
+            PartitionPredicate specifiedPartitions,
+            Duration historyPartitionIdleTime,
+            int historyPartitionLimit) {
         this.table = table;
         this.incrementalClusterStrategy = incrementalClusterStrategy;
         this.partitionComputer = partitionComputer;
-        this.maxLevel = maxLevel;
-        this.historyPartitionLimit = historyPartitionLimit;
         this.specifiedPartitions = specifiedPartitions;
         this.historyPartitionIdleTime = historyPartitionIdleTime;
-        // (maxLevel + 1) / 2 is used to calculate the ceiling of maxLevel 
divided by 2
-        this.partitionLevelFilter =
-                (partitionMinLevel, partitionMaxLevel) -> partitionMinLevel < 
(maxLevel + 1) / 2;
+        this.historyPartitionLimit = historyPartitionLimit;
+        this.maxLevel = table.coreOptions().numLevels() - 1;
+    }
+
+    @Nullable
+    public static HistoryPartitionCluster create(
+            FileStoreTable table,
+            IncrementalClusterStrategy incrementalClusterStrategy,
+            InternalRowPartitionComputer partitionComputer,
+            @Nullable PartitionPredicate specifiedPartitions) {
+        if (table.schema().partitionKeys().isEmpty()) {
+            return null;
+        }
+        if (specifiedPartitions == null) {
+            return null;
+        }
+
+        Duration idleTime = 
table.coreOptions().clusteringHistoryPartitionIdleTime();
+        if (idleTime == null) {
+            return null;
+        }
+
+        int limit = table.coreOptions().clusteringHistoryPartitionLimit();
+        return new HistoryPartitionCluster(
+                table,
+                incrementalClusterStrategy,
+                partitionComputer,
+                specifiedPartitions,
+                idleTime,
+                limit);
     }
 
-    public Map<BinaryRow, Optional<CompactUnit>> pickForHistoryPartitions() {
+    public Map<BinaryRow, CompactUnit> pickForHistoryPartitions() {
         Map<BinaryRow, List<LevelSortedRun>> partitionLevels =
                 constructLevelsForHistoryPartitions();
         logForPartitionLevel(partitionLevels, partitionComputer);
 
-        return partitionLevels.entrySet().stream()
-                .collect(
-                        Collectors.toMap(
-                                Map.Entry::getKey,
-                                entry ->
-                                        incrementalClusterStrategy.pick(
-                                                maxLevel, entry.getValue(), 
true)));
+        Map<BinaryRow, CompactUnit> units = new HashMap<>();
+        partitionLevels.forEach(
+                (k, v) -> {
+                    Optional<CompactUnit> pick =
+                            incrementalClusterStrategy.pick(maxLevel + 1, v, 
true);
+                    pick.ifPresent(compactUnit -> units.put(k, compactUnit));
+                });
+        return units;
     }
 
     @VisibleForTesting
     public Map<BinaryRow, List<LevelSortedRun>> 
constructLevelsForHistoryPartitions() {
-        if (specifiedPartitions == null
-                || historyPartitionIdleTime == null
-                || historyPartitionLimit <= 0) {
-            return Collections.emptyMap();
-        }
-
         long historyMilli =
                 LocalDateTime.now()
                         .minus(historyPartitionIdleTime)
                         .atZone(ZoneId.systemDefault())
                         .toInstant()
                         .toEpochMilli();
-        // read partitionEntries filter by partitionLevelFilter 
historyPartitionIdleTime
-        // sort partitionEntries by lastFileCreation time, and we will pick 
the oldest N partitions
+
         List<BinaryRow> historyPartitions =
-                
table.newSnapshotReader().withManifestLevelFilter(partitionLevelFilter)
-                        .partitionEntries().stream()
+                table.newSnapshotReader().withLevelMinMaxFilter((min, max) -> 
min < maxLevel)
+                        .withLevelFilter(level -> level < 
maxLevel).partitionEntries().stream()
                         .filter(entry -> entry.lastFileCreationTime() < 
historyMilli)
                         
.sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime))
                         .map(PartitionEntry::partition)
@@ -131,6 +143,7 @@ public class HistoryPartitionCluster {
                         .withPartitionFilter(historyPartitions)
                         .read()
                         .dataSplits();
+
         Map<BinaryRow, List<DataFileMeta>> historyPartitionFiles = new 
HashMap<>();
         for (DataSplit dataSplit : historyDataSplits) {
             historyPartitionFiles
@@ -138,52 +151,34 @@ public class HistoryPartitionCluster {
                     .addAll(dataSplit.dataFiles());
         }
 
-        // find history partitions which have low-level files
-        Set<BinaryRow> selectedHistoryPartitions =
-                findLowLevelPartitions(historyPartitions, 
historyPartitionFiles);
-        historyPartitionFiles =
-                historyPartitionFiles.entrySet().stream()
-                        .filter(entry -> 
selectedHistoryPartitions.contains(entry.getKey()))
-                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-
-        return historyPartitionFiles.entrySet().stream()
+        return filterPartitions(historyPartitionFiles).entrySet().stream()
                 .collect(
                         Collectors.toMap(
                                 Map.Entry::getKey,
                                 entry -> 
constructPartitionLevels(entry.getValue())));
     }
 
-    @VisibleForTesting
-    protected Set<BinaryRow> findLowLevelPartitions(
-            List<BinaryRow> historyPartitions, Map<BinaryRow, 
List<DataFileMeta>> partitionFiles) {
-        Set<BinaryRow> partitions = new HashSet<>();
-        // 1. the partition is not specified in specifiedPartitions
-        // 2. the min file level in partition should be less than 
Math.ceil(maxLevel/2)
-        for (BinaryRow historyPartition : historyPartitions) {
-            if (specifiedPartitions != null && 
!specifiedPartitions.test(historyPartition)) {
-                List<DataFileMeta> files =
-                        partitionFiles.getOrDefault(historyPartition, 
Collections.emptyList());
-                if (!files.isEmpty()) {
-                    int partitionMinLevel = maxLevel + 1;
-                    for (DataFileMeta file : files) {
-                        partitionMinLevel = Math.min(partitionMinLevel, 
file.level());
+    private Map<BinaryRow, List<DataFileMeta>> filterPartitions(
+            Map<BinaryRow, List<DataFileMeta>> partitionFiles) {
+        Map<BinaryRow, List<DataFileMeta>> result = new HashMap<>();
+        partitionFiles.forEach(
+                (part, files) -> {
+                    if (specifiedPartitions.test(part)) {
+                        // already contain in specified partitions
+                        return;
                     }
-                    if (partitionLevelFilter != null
-                            && partitionLevelFilter.test(partitionMinLevel, 
maxLevel)) {
-                        partitions.add(historyPartition);
-                        if (partitions.size() >= historyPartitionLimit) {
-                            break;
-                        }
+
+                    if (result.size() < historyPartitionLimit) {
+                        // in limit, can be picked
+                        result.put(part, files);
                     }
-                }
-            }
-        }
+                });
         LOG.info(
                 "Find {} history partitions for full clustering, the history 
partitions are {}",
-                partitions.size(),
-                partitions.stream()
+                result.size(),
+                result.keySet().stream()
                         .map(partitionComputer::generatePartValues)
                         .collect(Collectors.toSet()));
-        return partitions;
+        return result;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 8142f9dbf0..6865b4b82e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.append.cluster;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -52,17 +53,14 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class IncrementalClusterManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalClusterManager.class);
-    private final InternalRowPartitionComputer partitionComputer;
 
+    private final InternalRowPartitionComputer partitionComputer;
     private final SnapshotReader snapshotReader;
-
     private final IncrementalClusterStrategy incrementalClusterStrategy;
     private final CoreOptions.OrderType clusterCurve;
     private final List<String> clusterKeys;
-
-    private final HistoryPartitionCluster historyPartitionCluster;
-
-    private int maxLevel;
+    private final int numLevels;
+    private final @Nullable HistoryPartitionCluster historyPartitionCluster;
 
     public IncrementalClusterManager(FileStoreTable table) {
         this(table, null);
@@ -78,7 +76,7 @@ public class IncrementalClusterManager {
                 options.clusteringIncrementalEnabled(),
                 "Only support incremental clustering when '%s' is true.",
                 CLUSTERING_INCREMENTAL.key());
-        this.maxLevel = options.numLevels();
+        this.numLevels = options.numLevels();
         this.partitionComputer =
                 new InternalRowPartitionComputer(
                         table.coreOptions().partitionDefaultName(),
@@ -96,16 +94,9 @@ public class IncrementalClusterManager {
                         options.numSortedRunCompactionTrigger());
         this.clusterCurve = 
options.clusteringStrategy(options.clusteringColumns().size());
         this.clusterKeys = options.clusteringColumns();
-
         this.historyPartitionCluster =
-                new HistoryPartitionCluster(
-                        table,
-                        incrementalClusterStrategy,
-                        partitionComputer,
-                        maxLevel,
-                        options.clusteringHistoryPartitionLimit(),
-                        specifiedPartitions,
-                        options.clusteringHistoryPartitionIdleTime());
+                HistoryPartitionCluster.create(
+                        table, incrementalClusterStrategy, partitionComputer, 
specifiedPartitions);
     }
 
     public Map<BinaryRow, CompactUnit> prepareForCluster(boolean 
fullCompaction) {
@@ -114,30 +105,20 @@ public class IncrementalClusterManager {
         logForPartitionLevel(partitionLevels, partitionComputer);
 
         // 2. pick files to be clustered for each partition
-        Map<BinaryRow, Optional<CompactUnit>> units =
-                partitionLevels.entrySet().stream()
-                        .collect(
-                                Collectors.toMap(
-                                        Map.Entry::getKey,
-                                        entry ->
-                                                
incrementalClusterStrategy.pick(
-                                                        maxLevel,
-                                                        entry.getValue(),
-                                                        fullCompaction)));
-
-        Map<BinaryRow, Optional<CompactUnit>> historyUnits =
-                historyPartitionCluster.pickForHistoryPartitions();
-        units.putAll(historyUnits);
+        Map<BinaryRow, CompactUnit> units = new HashMap<>();
+        partitionLevels.forEach(
+                (k, v) -> {
+                    Optional<CompactUnit> pick =
+                            incrementalClusterStrategy.pick(numLevels, v, 
fullCompaction);
+                    pick.ifPresent(compactUnit -> units.put(k, compactUnit));
+                });
+
+        if (historyPartitionCluster != null) {
+            units.putAll(historyPartitionCluster.pickForHistoryPartitions());
+        }
 
-        // 3. filter out empty units
-        Map<BinaryRow, CompactUnit> filteredUnits =
-                units.entrySet().stream()
-                        .filter(entry -> entry.getValue().isPresent())
-                        .collect(
-                                Collectors.toMap(
-                                        Map.Entry::getKey, entry -> 
entry.getValue().get()));
         if (LOG.isDebugEnabled()) {
-            filteredUnits.forEach(
+            units.forEach(
                     (partition, compactUnit) -> {
                         String filesInfo =
                                 compactUnit.files().stream()
@@ -157,24 +138,17 @@ public class IncrementalClusterManager {
                                 filesInfo);
                     });
         }
-        return filteredUnits;
+        return units;
     }
 
     public Map<BinaryRow, List<LevelSortedRun>> constructLevels() {
         List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
-
-        maxLevel =
-                Math.max(
-                        maxLevel,
-                        dataSplits.stream()
-                                        .flatMap(split -> 
split.dataFiles().stream())
-                                        .mapToInt(DataFileMeta::level)
-                                        .max()
-                                        .orElse(-1)
-                                + 1);
-        checkArgument(maxLevel > 1, "Number of levels must be at least 2.");
-
-        Map<BinaryRow, List<DataFileMeta>> partitionFiles = 
getPartitionFiles(dataSplits);
+        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
+        for (DataSplit dataSplit : dataSplits) {
+            partitionFiles
+                    .computeIfAbsent(dataSplit.partition(), k -> new 
ArrayList<>())
+                    .addAll(dataSplit.dataFiles());
+        }
         return partitionFiles.entrySet().stream()
                 .collect(
                         Collectors.toMap(
@@ -208,16 +182,6 @@ public class IncrementalClusterManager {
         return partitionLevels;
     }
 
-    private Map<BinaryRow, List<DataFileMeta>> 
getPartitionFiles(List<DataSplit> dataSplits) {
-        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
-        for (DataSplit dataSplit : dataSplits) {
-            partitionFiles
-                    .computeIfAbsent(dataSplit.partition(), k -> new 
ArrayList<>())
-                    .addAll(dataSplit.dataFiles());
-        }
-        return partitionFiles;
-    }
-
     public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> 
files) {
         List<DataSplit> splits = new ArrayList<>();
 
@@ -283,7 +247,8 @@ public class IncrementalClusterManager {
         return clusterKeys;
     }
 
-    public HistoryPartitionCluster historyPartitionCluster() {
+    @VisibleForTesting
+    HistoryPartitionCluster historyPartitionCluster() {
         return historyPartitionCluster;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 736bc41d7b..2c69b2ce80 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -197,8 +197,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     @Override
-    public FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter) {
-        manifestsReader.withManifestLevelFilter(manifestLevelFilter);
+    public FileStoreScan withLevelMinMaxFilter(BiFilter<Integer, Integer> 
minMaxFilter) {
+        manifestsReader.withLevelMinMaxFilter(minMaxFilter);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 354c43e019..047b1c3f5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -75,7 +75,7 @@ public interface FileStoreScan {
 
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
-    FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter);
+    FileStoreScan withLevelMinMaxFilter(BiFilter<Integer, Integer> 
minMaxFilter);
 
     FileStoreScan enableValueFilter();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index dbf5140583..b3b89e72aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -53,7 +53,7 @@ public class ManifestsReader {
     @Nullable private Integer specifiedBucket = null;
     @Nullable private Integer specifiedLevel = null;
     @Nullable private PartitionPredicate partitionFilter = null;
-    @Nullable private BiFilter<Integer, Integer> manifestLevelFilter = null;
+    @Nullable private BiFilter<Integer, Integer> levelMinMaxFilter = null;
 
     public ManifestsReader(
             RowType partitionType,
@@ -81,8 +81,8 @@ public class ManifestsReader {
         return this;
     }
 
-    public ManifestsReader withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter) {
-        this.manifestLevelFilter = manifestLevelFilter;
+    public ManifestsReader withLevelMinMaxFilter(BiFilter<Integer, Integer> 
minMaxFilter) {
+        this.levelMinMaxFilter = minMaxFilter;
         return this;
     }
 
@@ -167,7 +167,7 @@ public class ManifestsReader {
                     && (specifiedLevel < minLevel || specifiedLevel > 
maxLevel)) {
                 return false;
             }
-            if (manifestLevelFilter != null && 
!manifestLevelFilter.test(minLevel, maxLevel)) {
+            if (levelMinMaxFilter != null && !levelMinMaxFilter.test(minLevel, 
maxLevel)) {
                 return false;
             }
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 91505ffcf9..ed80f3f92a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -88,7 +88,7 @@ public interface SnapshotReader {
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
 
-    SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter);
+    SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer> 
minMaxFilter);
 
     SnapshotReader enableValueFilter();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 22cd5cad6f..038f4fec18 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -262,8 +262,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
-    public SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter) {
-        scan.withManifestLevelFilter(manifestLevelFilter);
+    @Override
+    public SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer> 
minMaxFilter) {
+        scan.withLevelMinMaxFilter(minMaxFilter);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index f71eba944d..3fd6a8d8af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -361,9 +361,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public SnapshotReader withManifestLevelFilter(
-                BiFilter<Integer, Integer> manifestLevelFilter) {
-            wrapped.withManifestLevelFilter(manifestLevelFilter);
+        public SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer> 
minMaxFilter) {
+            wrapped.withLevelMinMaxFilter(minMaxFilter);
             return this;
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
index 4692b50fbb..f1f93ca989 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
@@ -24,8 +24,6 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.mergetree.LevelSortedRun;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
@@ -39,17 +37,12 @@ import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.time.LocalDateTime;
-import java.time.ZoneId;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce;
-import static 
org.apache.paimon.append.cluster.IncrementalClusterStrategyTest.createFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link HistoryPartitionCluster}. */
@@ -57,53 +50,6 @@ public class HistoryPartitionClusterTest {
 
     @TempDir java.nio.file.Path tempDir;
 
-    @Test
-    public void testFindLowLevelPartitions() throws Exception {
-        FileStoreTable table = createTable(Collections.emptyMap(), 
Collections.emptyList());
-        long now = 
LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
-        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
-
-        // specified partition, has low-level files
-        BinaryRow partition1 = BinaryRow.singleColumn(1);
-        PartitionEntry partitionEntry1 = new PartitionEntry(partition1, 0, 0, 
0, now);
-        partitionFiles.put(
-                partition1, Lists.newArrayList(createFile(100, 1, 3), 
createFile(100, 1, 5)));
-        // has no low-level files
-        BinaryRow partition2 = BinaryRow.singleColumn(2);
-        PartitionEntry partitionEntry2 = new PartitionEntry(partition2, 0, 0, 
0, now);
-        partitionFiles.put(partition2, Lists.newArrayList(createFile(100, 1, 
0)));
-        // has low-level files
-        BinaryRow partition3 = BinaryRow.singleColumn(3);
-        PartitionEntry partitionEntry3 = new PartitionEntry(partition3, 0, 0, 
0, now);
-        partitionFiles.put(
-                partition3, Lists.newArrayList(createFile(100, 1, 0), 
createFile(100, 1, 2)));
-        // has no low-level files
-        BinaryRow partition4 = BinaryRow.singleColumn(4);
-        PartitionEntry partitionEntry4 = new PartitionEntry(partition3, 0, 0, 
0, now);
-        partitionFiles.put(partition4, Lists.newArrayList(createFile(100, 1, 
0)));
-
-        IncrementalClusterManager incrementalClusterManager =
-                new IncrementalClusterManager(
-                        table,
-                        PartitionPredicate.fromMultiple(
-                                RowType.of(DataTypes.INT()), 
Lists.newArrayList(partition1)));
-        HistoryPartitionCluster historyPartitionCluster =
-                incrementalClusterManager.historyPartitionCluster();
-        Set<BinaryRow> selectedPartitions =
-                historyPartitionCluster.findLowLevelPartitions(
-                        Lists.newArrayList(
-                                        partitionEntry1,
-                                        partitionEntry2,
-                                        partitionEntry3,
-                                        partitionEntry4)
-                                .stream()
-                                .map(PartitionEntry::partition)
-                                .collect(Collectors.toList()),
-                        partitionFiles);
-
-        assertThat(selectedPartitions).contains(partition2);
-    }
-
     @Test
     public void testHistoryPartitionAutoClustering() throws Exception {
         FileStoreTable table = createTable(Collections.emptyMap(), 
Collections.singletonList("f2"));
@@ -153,8 +99,7 @@ public class HistoryPartitionClusterTest {
 
         // test not specify partition and disable history partition auto 
clustering
         historyPartitionCluster = new 
IncrementalClusterManager(table).historyPartitionCluster();
-        partitionLevels = 
historyPartitionCluster.constructLevelsForHistoryPartitions();
-        assertThat(partitionLevels.isEmpty()).isTrue();
+        assertThat(historyPartitionCluster).isNull();
     }
 
     protected FileStoreTable createTable(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index 37cc006673..4b62caab88 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -58,7 +58,7 @@ public class IncrementalClusterManagerTest {
     @TempDir java.nio.file.Path tempDir;
 
     @Test
-    public void testNonUnAwareBucketTable() throws Exception {
+    public void testNonUnAwareBucketTable() {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.BUCKET.key(), "1");
         options.put(CoreOptions.BUCKET_KEY.key(), "f0");

Reply via email to