This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 487be6bf67 [core] support automatically clustering historical 
partition (#6472)
487be6bf67 is described below

commit 487be6bf67c2b8029c0477b82354d11db8db338d
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Oct 28 21:09:42 2025 +0800

    [core] support automatically clustering historical partition (#6472)
---
 .../shortcodes/generated/core_configuration.html   |  12 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  24 +++
 .../append/cluster/HistoryPartitionCluster.java    | 189 ++++++++++++++++++++
 .../append/cluster/IncrementalClusterManager.java  | 101 +++++++----
 .../paimon/operation/AbstractFileStoreScan.java    |   6 +
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +
 .../apache/paimon/operation/ManifestsReader.java   |  10 ++
 .../table/source/snapshot/SnapshotReader.java      |   3 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |   6 +
 .../apache/paimon/table/system/AuditLogTable.java  |   8 +
 .../cluster/HistoryPartitionClusterTest.java       | 190 +++++++++++++++++++++
 .../cluster/IncrementalClusterManagerTest.java     |  88 +++++++++-
 .../cluster/IncrementalClusterStrategyTest.java    |   2 +-
 .../action/IncrementalClusterActionITCase.java     | 173 ++++++++++++++++++-
 14 files changed, 777 insertions(+), 37 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6b3a8fbfb5..9a5f3c99ad 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -164,6 +164,18 @@ under the License.
             <td>String</td>
             <td>Specifies the column name(s) used for comparison during range 
partitioning, in the format 'columnName1,columnName2'. If not set or set to an 
empty string, it indicates that the range partitioning feature is not enabled. 
This option will be effective only for append table without primary keys and 
batch execution mode.</td>
         </tr>
+        <tr>
+            <td><h5>clustering.history-partition.idle-to-full-sort</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>The duration after which a partition without new updates is 
considered a historical partition. Historical partitions will be automatically 
fully clustered during the cluster operation.This option takes effects when 
'clustering.history-partition.auto.enabled' is true.</td>
+        </tr>
+        <tr>
+            <td><h5>clustering.history-partition.limit</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>The limit of history partition number for automatically 
performing full clustering.</td>
+        </tr>
         <tr>
             <td><h5>clustering.incremental</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 267d1255a3..bac0d58fd9 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1957,6 +1957,22 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether enable incremental clustering.");
 
+    public static final ConfigOption<Integer> 
CLUSTERING_HISTORY_PARTITION_LIMIT =
+            key("clustering.history-partition.limit")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The limit of history partition number for 
automatically performing full clustering.");
+
+    public static final ConfigOption<Duration> 
CLUSTERING_HISTORY_PARTITION_IDLE_TIME =
+            key("clustering.history-partition.idle-to-full-sort")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The duration after which a partition without new 
updates is considered a historical partition. "
+                                    + "Historical partitions will be 
automatically fully clustered during the cluster operation."
+                                    + "This option takes effects when 
'clustering.history-partition.auto.enabled' is true.");
+
     public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
             key("row-tracking.enabled")
                     .booleanType()
@@ -3074,6 +3090,14 @@ public class CoreOptions implements Serializable {
         return options.get(CLUSTERING_INCREMENTAL);
     }
 
+    public Duration clusteringHistoryPartitionIdleTime() {
+        return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
+    }
+
+    public int clusteringHistoryPartitionLimit() {
+        return options.get(CLUSTERING_HISTORY_PARTITION_LIMIT);
+    }
+
     public OrderType clusteringStrategy(int columnSize) {
         return clusteringStrategy(options.get(CLUSTERING_STRATEGY), 
columnSize);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
new file mode 100644
index 0000000000..cccecdd2be
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels;
+import static 
org.apache.paimon.append.cluster.IncrementalClusterManager.logForPartitionLevel;
+
+/** Handle historical partition for full clustering. */
+public class HistoryPartitionCluster {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HistoryPartitionCluster.class);
+    private final InternalRowPartitionComputer partitionComputer;
+
+    private final FileStoreTable table;
+    private final IncrementalClusterStrategy incrementalClusterStrategy;
+    private final int maxLevel;
+
+    private final int historyPartitionLimit;
+    @Nullable private final PartitionPredicate specifiedPartitions;
+    @Nullable private final Duration historyPartitionIdleTime;
+    @Nullable private final BiFilter<Integer, Integer> partitionLevelFilter;
+
+    public HistoryPartitionCluster(
+            FileStoreTable table,
+            IncrementalClusterStrategy incrementalClusterStrategy,
+            InternalRowPartitionComputer partitionComputer,
+            int maxLevel,
+            int historyPartitionLimit,
+            @Nullable PartitionPredicate specifiedPartitions,
+            @Nullable Duration historyPartitionIdleTime) {
+        this.table = table;
+        this.incrementalClusterStrategy = incrementalClusterStrategy;
+        this.partitionComputer = partitionComputer;
+        this.maxLevel = maxLevel;
+        this.historyPartitionLimit = historyPartitionLimit;
+        this.specifiedPartitions = specifiedPartitions;
+        this.historyPartitionIdleTime = historyPartitionIdleTime;
+        // (maxLevel + 1) / 2 is used to calculate the ceiling of maxLevel 
divided by 2
+        this.partitionLevelFilter =
+                (partitionMinLevel, partitionMaxLevel) -> partitionMinLevel < 
(maxLevel + 1) / 2;
+    }
+
+    public Map<BinaryRow, Optional<CompactUnit>> pickForHistoryPartitions() {
+        Map<BinaryRow, List<LevelSortedRun>> partitionLevels =
+                constructLevelsForHistoryPartitions();
+        logForPartitionLevel(partitionLevels, partitionComputer);
+
+        return partitionLevels.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry ->
+                                        incrementalClusterStrategy.pick(
+                                                maxLevel, entry.getValue(), 
true)));
+    }
+
+    @VisibleForTesting
+    public Map<BinaryRow, List<LevelSortedRun>> 
constructLevelsForHistoryPartitions() {
+        if (specifiedPartitions == null
+                || historyPartitionIdleTime == null
+                || historyPartitionLimit <= 0) {
+            return Collections.emptyMap();
+        }
+
+        long historyMilli =
+                LocalDateTime.now()
+                        .minus(historyPartitionIdleTime)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
+        // read partitionEntries filter by partitionLevelFilter 
historyPartitionIdleTime
+        // sort partitionEntries by lastFileCreation time, and we will pick 
the oldest N partitions
+        List<BinaryRow> historyPartitions =
+                
table.newSnapshotReader().withManifestLevelFilter(partitionLevelFilter)
+                        .partitionEntries().stream()
+                        .filter(entry -> entry.lastFileCreationTime() < 
historyMilli)
+                        
.sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime))
+                        .map(PartitionEntry::partition)
+                        .collect(Collectors.toList());
+
+        // read dataFileMeta for history partitions
+        List<DataSplit> historyDataSplits =
+                table.newSnapshotReader()
+                        .withPartitionFilter(historyPartitions)
+                        .read()
+                        .dataSplits();
+        Map<BinaryRow, List<DataFileMeta>> historyPartitionFiles = new 
HashMap<>();
+        for (DataSplit dataSplit : historyDataSplits) {
+            historyPartitionFiles
+                    .computeIfAbsent(dataSplit.partition(), k -> new 
ArrayList<>())
+                    .addAll(dataSplit.dataFiles());
+        }
+
+        // find history partitions which have low-level files
+        Set<BinaryRow> selectedHistoryPartitions =
+                findLowLevelPartitions(historyPartitions, 
historyPartitionFiles);
+        historyPartitionFiles =
+                historyPartitionFiles.entrySet().stream()
+                        .filter(entry -> 
selectedHistoryPartitions.contains(entry.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        return historyPartitionFiles.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry -> 
constructPartitionLevels(entry.getValue())));
+    }
+
+    @VisibleForTesting
+    protected Set<BinaryRow> findLowLevelPartitions(
+            List<BinaryRow> historyPartitions, Map<BinaryRow, 
List<DataFileMeta>> partitionFiles) {
+        Set<BinaryRow> partitions = new HashSet<>();
+        // 1. the partition is not specified in specifiedPartitions
+        // 2. the min file level in partition should be less than 
Math.ceil(maxLevel/2)
+        for (BinaryRow historyPartition : historyPartitions) {
+            if (specifiedPartitions != null && 
!specifiedPartitions.test(historyPartition)) {
+                List<DataFileMeta> files =
+                        partitionFiles.getOrDefault(historyPartition, 
Collections.emptyList());
+                if (!files.isEmpty()) {
+                    int partitionMinLevel = maxLevel + 1;
+                    for (DataFileMeta file : files) {
+                        partitionMinLevel = Math.min(partitionMinLevel, 
file.level());
+                    }
+                    if (partitionLevelFilter != null
+                            && partitionLevelFilter.test(partitionMinLevel, 
maxLevel)) {
+                        partitions.add(historyPartition);
+                        if (partitions.size() >= historyPartitionLimit) {
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        LOG.info(
+                "Find {} history partitions for full clustering, the history 
partitions are {}",
+                partitions.size(),
+                partitions.stream()
+                        .map(partitionComputer::generatePartValues)
+                        .collect(Collectors.toSet()));
+        return partitions;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 1aa4d2b13e..8142f9dbf0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -30,6 +30,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class IncrementalClusterManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalClusterManager.class);
+    private final InternalRowPartitionComputer partitionComputer;
 
     private final SnapshotReader snapshotReader;
 
@@ -58,6 +60,8 @@ public class IncrementalClusterManager {
     private final CoreOptions.OrderType clusterCurve;
     private final List<String> clusterKeys;
 
+    private final HistoryPartitionCluster historyPartitionCluster;
+
     private int maxLevel;
 
     public IncrementalClusterManager(FileStoreTable table) {
@@ -69,14 +73,20 @@ public class IncrementalClusterManager {
         checkArgument(
                 table.bucketMode() == BucketMode.BUCKET_UNAWARE,
                 "only append unaware-bucket table support incremental 
clustering.");
-        // drop stats to reduce memory usage
-        this.snapshotReader =
-                
table.newSnapshotReader().withPartitionFilter(specifiedPartitions).dropStats();
         CoreOptions options = table.coreOptions();
         checkArgument(
                 options.clusteringIncrementalEnabled(),
                 "Only support incremental clustering when '%s' is true.",
                 CLUSTERING_INCREMENTAL.key());
+        this.maxLevel = options.numLevels();
+        this.partitionComputer =
+                new InternalRowPartitionComputer(
+                        table.coreOptions().partitionDefaultName(),
+                        table.store().partitionType(),
+                        table.partitionKeys().toArray(new String[0]),
+                        table.coreOptions().legacyPartitionName());
+        this.snapshotReader =
+                
table.newSnapshotReader().dropStats().withPartitionFilter(specifiedPartitions);
         this.incrementalClusterStrategy =
                 new IncrementalClusterStrategy(
                         table.schemaManager(),
@@ -86,31 +96,22 @@ public class IncrementalClusterManager {
                         options.numSortedRunCompactionTrigger());
         this.clusterCurve = 
options.clusteringStrategy(options.clusteringColumns().size());
         this.clusterKeys = options.clusteringColumns();
-        this.maxLevel = options.numLevels();
+
+        this.historyPartitionCluster =
+                new HistoryPartitionCluster(
+                        table,
+                        incrementalClusterStrategy,
+                        partitionComputer,
+                        maxLevel,
+                        options.clusteringHistoryPartitionLimit(),
+                        specifiedPartitions,
+                        options.clusteringHistoryPartitionIdleTime());
     }
 
     public Map<BinaryRow, CompactUnit> prepareForCluster(boolean 
fullCompaction) {
         // 1. construct LSM structure for each partition
         Map<BinaryRow, List<LevelSortedRun>> partitionLevels = 
constructLevels();
-        if (LOG.isDebugEnabled()) {
-            partitionLevels.forEach(
-                    (partition, levelSortedRuns) -> {
-                        String runsInfo =
-                                levelSortedRuns.stream()
-                                        .map(
-                                                lsr ->
-                                                        String.format(
-                                                                "level-%s:%s",
-                                                                lsr.level(),
-                                                                
lsr.run().files().size()))
-                                        .collect(Collectors.joining(","));
-                        LOG.debug(
-                                "Partition {} has {} runs: [{}]",
-                                partition,
-                                levelSortedRuns.size(),
-                                runsInfo);
-                    });
-        }
+        logForPartitionLevel(partitionLevels, partitionComputer);
 
         // 2. pick files to be clustered for each partition
         Map<BinaryRow, Optional<CompactUnit>> units =
@@ -124,6 +125,10 @@ public class IncrementalClusterManager {
                                                         entry.getValue(),
                                                         fullCompaction)));
 
+        Map<BinaryRow, Optional<CompactUnit>> historyUnits =
+                historyPartitionCluster.pickForHistoryPartitions();
+        units.putAll(historyUnits);
+
         // 3. filter out empty units
         Map<BinaryRow, CompactUnit> filteredUnits =
                 units.entrySet().stream()
@@ -146,7 +151,7 @@ public class IncrementalClusterManager {
                                         .collect(Collectors.joining(", "));
                         LOG.debug(
                                 "Partition {}, outputLevel:{}, clustered with 
{} files: [{}]",
-                                partition,
+                                
partitionComputer.generatePartValues(partition),
                                 compactUnit.outputLevel(),
                                 compactUnit.files().size(),
                                 filesInfo);
@@ -169,13 +174,7 @@ public class IncrementalClusterManager {
                                 + 1);
         checkArgument(maxLevel > 1, "Number of levels must be at least 2.");
 
-        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
-        for (DataSplit dataSplit : dataSplits) {
-            partitionFiles
-                    .computeIfAbsent(dataSplit.partition(), k -> new 
ArrayList<>())
-                    .addAll(dataSplit.dataFiles());
-        }
-
+        Map<BinaryRow, List<DataFileMeta>> partitionFiles = 
getPartitionFiles(dataSplits);
         return partitionFiles.entrySet().stream()
                 .collect(
                         Collectors.toMap(
@@ -183,7 +182,7 @@ public class IncrementalClusterManager {
                                 entry -> 
constructPartitionLevels(entry.getValue())));
     }
 
-    public List<LevelSortedRun> constructPartitionLevels(List<DataFileMeta> 
partitionFiles) {
+    public static List<LevelSortedRun> 
constructPartitionLevels(List<DataFileMeta> partitionFiles) {
         List<LevelSortedRun> partitionLevels = new ArrayList<>();
         Map<Integer, List<DataFileMeta>> levelMap =
                 
partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level));
@@ -209,6 +208,16 @@ public class IncrementalClusterManager {
         return partitionLevels;
     }
 
+    private Map<BinaryRow, List<DataFileMeta>> 
getPartitionFiles(List<DataSplit> dataSplits) {
+        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
+        for (DataSplit dataSplit : dataSplits) {
+            partitionFiles
+                    .computeIfAbsent(dataSplit.partition(), k -> new 
ArrayList<>())
+                    .addAll(dataSplit.dataFiles());
+        }
+        return partitionFiles;
+    }
+
     public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> 
files) {
         List<DataSplit> splits = new ArrayList<>();
 
@@ -242,6 +251,30 @@ public class IncrementalClusterManager {
                 .collect(Collectors.toList());
     }
 
+    public static void logForPartitionLevel(
+            Map<BinaryRow, List<LevelSortedRun>> partitionLevels,
+            InternalRowPartitionComputer partitionComputer) {
+        if (LOG.isDebugEnabled()) {
+            partitionLevels.forEach(
+                    (partition, levelSortedRuns) -> {
+                        String runsInfo =
+                                levelSortedRuns.stream()
+                                        .map(
+                                                lsr ->
+                                                        String.format(
+                                                                "level-%s:%s",
+                                                                lsr.level(),
+                                                                
lsr.run().files().size()))
+                                        .collect(Collectors.joining(","));
+                        LOG.debug(
+                                "Partition {} has {} runs: [{}]",
+                                
partitionComputer.generatePartValues(partition),
+                                levelSortedRuns.size(),
+                                runsInfo);
+                    });
+        }
+    }
+
     public CoreOptions.OrderType clusterCurve() {
         return clusterCurve;
     }
@@ -249,4 +282,8 @@ public class IncrementalClusterManager {
     public List<String> clusterKeys() {
         return clusterKeys;
     }
+
+    public HistoryPartitionCluster historyPartitionCluster() {
+        return historyPartitionCluster;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 0e3f39715d..736bc41d7b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -196,6 +196,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter) {
+        manifestsReader.withManifestLevelFilter(manifestLevelFilter);
+        return this;
+    }
+
     @Override
     public FileStoreScan enableValueFilter() {
         return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 5fa3127237..354c43e019 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -75,6 +75,8 @@ public interface FileStoreScan {
 
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
+    FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter);
+
     FileStoreScan enableValueFilter();
 
     FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index d58bb797e3..dbf5140583 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -27,6 +27,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -52,6 +53,7 @@ public class ManifestsReader {
     @Nullable private Integer specifiedBucket = null;
     @Nullable private Integer specifiedLevel = null;
     @Nullable private PartitionPredicate partitionFilter = null;
+    @Nullable private BiFilter<Integer, Integer> manifestLevelFilter = null;
 
     public ManifestsReader(
             RowType partitionType,
@@ -79,6 +81,11 @@ public class ManifestsReader {
         return this;
     }
 
+    public ManifestsReader withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter) {
+        this.manifestLevelFilter = manifestLevelFilter;
+        return this;
+    }
+
     public ManifestsReader withPartitionFilter(Predicate predicate) {
         this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
         return this;
@@ -160,6 +167,9 @@ public class ManifestsReader {
                     && (specifiedLevel < minLevel || specifiedLevel > 
maxLevel)) {
                 return false;
             }
+            if (manifestLevelFilter != null && 
!manifestLevelFilter.test(minLevel, maxLevel)) {
+                return false;
+            }
         }
 
         if (partitionFilter == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 82112f5fd4..91505ffcf9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
@@ -87,6 +88,8 @@ public interface SnapshotReader {
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
 
+    SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter);
+
     SnapshotReader enableValueFilter();
 
     SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> filter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 942c5bc41c..22cd5cad6f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -49,6 +49,7 @@ import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -261,6 +262,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    public SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> 
manifestLevelFilter) {
+        scan.withManifestLevelFilter(manifestLevelFilter);
+        return this;
+    }
+
     @Override
     public SnapshotReader enableValueFilter() {
         scan.enableValueFilter();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index ab3b13a5ab..f71eba944d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -59,6 +59,7 @@ import 
org.apache.paimon.table.source.snapshot.StartingContext;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -359,6 +360,13 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withManifestLevelFilter(
+                BiFilter<Integer, Integer> manifestLevelFilter) {
+            wrapped.withManifestLevelFilter(manifestLevelFilter);
+            return this;
+        }
+
         @Override
         public SnapshotReader enableValueFilter() {
             wrapped.enableValueFilter();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
new file mode 100644
index 0000000000..4692b50fbb
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce;
+import static 
org.apache.paimon.append.cluster.IncrementalClusterStrategyTest.createFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HistoryPartitionCluster}. */
+public class HistoryPartitionClusterTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testFindLowLevelPartitions() throws Exception {
+        FileStoreTable table = createTable(Collections.emptyMap(), 
Collections.emptyList());
+        long now = 
LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
+
+        // specified partition, has low-level files
+        BinaryRow partition1 = BinaryRow.singleColumn(1);
+        PartitionEntry partitionEntry1 = new PartitionEntry(partition1, 0, 0, 
0, now);
+        partitionFiles.put(
+                partition1, Lists.newArrayList(createFile(100, 1, 3), 
createFile(100, 1, 5)));
+        // has no low-level files
+        BinaryRow partition2 = BinaryRow.singleColumn(2);
+        PartitionEntry partitionEntry2 = new PartitionEntry(partition2, 0, 0, 
0, now);
+        partitionFiles.put(partition2, Lists.newArrayList(createFile(100, 1, 
0)));
+        // has low-level files
+        BinaryRow partition3 = BinaryRow.singleColumn(3);
+        PartitionEntry partitionEntry3 = new PartitionEntry(partition3, 0, 0, 
0, now);
+        partitionFiles.put(
+                partition3, Lists.newArrayList(createFile(100, 1, 0), 
createFile(100, 1, 2)));
+        // has no low-level files
+        BinaryRow partition4 = BinaryRow.singleColumn(4);
+        PartitionEntry partitionEntry4 = new PartitionEntry(partition3, 0, 0, 
0, now);
+        partitionFiles.put(partition4, Lists.newArrayList(createFile(100, 1, 
0)));
+
+        IncrementalClusterManager incrementalClusterManager =
+                new IncrementalClusterManager(
+                        table,
+                        PartitionPredicate.fromMultiple(
+                                RowType.of(DataTypes.INT()), 
Lists.newArrayList(partition1)));
+        HistoryPartitionCluster historyPartitionCluster =
+                incrementalClusterManager.historyPartitionCluster();
+        Set<BinaryRow> selectedPartitions =
+                historyPartitionCluster.findLowLevelPartitions(
+                        Lists.newArrayList(
+                                        partitionEntry1,
+                                        partitionEntry2,
+                                        partitionEntry3,
+                                        partitionEntry4)
+                                .stream()
+                                .map(PartitionEntry::partition)
+                                .collect(Collectors.toList()),
+                        partitionFiles);
+
+        assertThat(selectedPartitions).contains(partition2);
+    }
+
+    @Test
+    public void testHistoryPartitionAutoClustering() throws Exception {
+        FileStoreTable table = createTable(Collections.emptyMap(), 
Collections.singletonList("f2"));
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt1"), 
BinaryString.fromString("test")));
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt2"), 
BinaryString.fromString("test")));
+
+        Thread.sleep(2000);
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt3"), 
BinaryString.fromString("test")));
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt4"), 
BinaryString.fromString("test")));
+
+        // test specify history partition and enable history partition auto 
clustering
+        HistoryPartitionCluster historyPartitionCluster =
+                new IncrementalClusterManager(
+                                table,
+                                PartitionPredicate.fromMultiple(
+                                        RowType.of(DataTypes.INT()),
+                                        
Lists.newArrayList(BinaryRow.singleColumn("pt1"))))
+                        .historyPartitionCluster();
+        Map<BinaryRow, List<LevelSortedRun>> partitionLevels =
+                historyPartitionCluster.constructLevelsForHistoryPartitions();
+        assertThat(partitionLevels.size()).isEqualTo(1);
+        
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt2"))).isNotEmpty();
+
+        // test specify non-history partition and enable history partition 
auto clustering
+        historyPartitionCluster =
+                new IncrementalClusterManager(
+                                table,
+                                PartitionPredicate.fromMultiple(
+                                        RowType.of(DataTypes.INT()),
+                                        
Lists.newArrayList(BinaryRow.singleColumn("pt3"))))
+                        .historyPartitionCluster();
+        partitionLevels = 
historyPartitionCluster.constructLevelsForHistoryPartitions();
+        assertThat(partitionLevels.size()).isEqualTo(1);
+        
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotEmpty();
+
+        // test not specify partition and disable history partition auto 
clustering
+        historyPartitionCluster = new 
IncrementalClusterManager(table).historyPartitionCluster();
+        partitionLevels = 
historyPartitionCluster.constructLevelsForHistoryPartitions();
+        assertThat(partitionLevels.isEmpty()).isTrue();
+    }
+
+    protected FileStoreTable createTable(
+            Map<String, String> customOptions, List<String> partitionKeys) 
throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1");
+        options.put(CoreOptions.CLUSTERING_INCREMENTAL.key(), "true");
+        options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), 
"2s");
+        options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_LIMIT.key(), "1");
+        options.putAll(customOptions);
+
+        Schema schema =
+                new Schema(
+                        RowType.of(
+                                        DataTypes.INT(),
+                                        DataTypes.INT(),
+                                        DataTypes.STRING(),
+                                        DataTypes.STRING())
+                                .getFields(),
+                        partitionKeys,
+                        Collections.emptyList(),
+                        options,
+                        "");
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        return FileStoreTableFactory.create(
+                LocalFileIO.create(),
+                new Path(tempDir.toString()),
+                schemaManager.createTable(schema));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index 8b37aff0b8..37cc006673 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -19,18 +19,27 @@
 package org.apache.paimon.append.cluster;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -143,7 +152,7 @@ public class IncrementalClusterManagerTest {
         // Test upgrading to level 3
         int outputLevel = 3;
         List<DataFileMeta> upgradedFiles =
-                incrementalClusterManager.upgrade(filesAfterCluster, 
outputLevel);
+                IncrementalClusterManager.upgrade(filesAfterCluster, 
outputLevel);
 
         // Verify the results
         assertThat(upgradedFiles).hasSize(3);
@@ -154,7 +163,67 @@ public class IncrementalClusterManagerTest {
         }
     }
 
-    private FileStoreTable createTable(
+    @Test
+    public void testHistoryPartitionAutoClustering() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), 
"2s");
+        options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_LIMIT.key(), "1");
+
+        FileStoreTable table = createTable(options, 
Collections.singletonList("f2"));
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt1"), 
BinaryString.fromString("test")));
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt2"), 
BinaryString.fromString("test")));
+
+        Thread.sleep(2000);
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt3"), 
BinaryString.fromString("test")));
+        writeOnce(
+                table,
+                GenericRow.of(
+                        1, 1, BinaryString.fromString("pt4"), 
BinaryString.fromString("test")));
+
+        // test specify partition and enable history partition auto clustering
+        IncrementalClusterManager incrementalClusterManager =
+                new IncrementalClusterManager(
+                        table,
+                        PartitionPredicate.fromMultiple(
+                                RowType.of(DataTypes.INT()),
+                                
Lists.newArrayList(BinaryRow.singleColumn("pt3"))));
+        Map<BinaryRow, CompactUnit> partitionLevels =
+                incrementalClusterManager.prepareForCluster(true);
+        assertThat(partitionLevels.size()).isEqualTo(2);
+        
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt1"))).isNotNull();
+        
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull();
+
+        // test don't specify partition and enable history partition auto 
clustering
+        incrementalClusterManager = new IncrementalClusterManager(table);
+        partitionLevels = incrementalClusterManager.prepareForCluster(true);
+        assertThat(partitionLevels.size()).isEqualTo(4);
+
+        // test specify partition and disable history partition auto clustering
+        SchemaChange schemaChange =
+                
SchemaChange.removeOption(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key());
+        incrementalClusterManager =
+                new IncrementalClusterManager(
+                        table.copy(
+                                table.schemaManager()
+                                        
.commitChanges(Collections.singletonList(schemaChange))),
+                        PartitionPredicate.fromMultiple(
+                                RowType.of(DataTypes.INT()),
+                                
Lists.newArrayList(BinaryRow.singleColumn("pt3"))));
+        partitionLevels = incrementalClusterManager.prepareForCluster(true);
+        assertThat(partitionLevels.size()).isEqualTo(1);
+        
assertThat(partitionLevels.get(BinaryRow.singleColumn("pt3"))).isNotNull();
+    }
+
+    protected FileStoreTable createTable(
             Map<String, String> customOptions, List<String> partitionKeys) 
throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.BUCKET.key(), "-1");
@@ -183,7 +252,20 @@ public class IncrementalClusterManagerTest {
                 schemaManager.createTable(schema));
     }
 
-    private static DataFileMeta createFile(long size, long schemaId, int 
level) {
+    protected static void writeOnce(FileStoreTable table, GenericRow... rows) {
+        String commitUser = "test_user";
+        try (BatchTableWrite write = table.newWrite(commitUser);
+                BatchTableCommit commit = table.newCommit(commitUser)) {
+            for (GenericRow row : rows) {
+                write.write(row);
+            }
+            commit.commit(write.prepareCommit());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected static DataFileMeta createFile(long size, long schemaId, int 
level) {
         return DataFileMeta.create(
                 "",
                 size,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java
index 1061c50c9e..a00f2442ae 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java
@@ -199,7 +199,7 @@ public class IncrementalClusterStrategyTest {
                 SchemaChange.setOption(CoreOptions.CLUSTERING_COLUMNS.key(), 
"f2,f3"));
     }
 
-    private static DataFileMeta createFile(long size, long schemaId, int 
level) {
+    protected static DataFileMeta createFile(long size, long schemaId, int 
level) {
         return DataFileMeta.create(
                 "",
                 size,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 7629ebf81d..48e0f1fa30 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -18,10 +18,12 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -41,8 +43,11 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -361,6 +366,158 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         }
     }
 
+    @Test
+    public void testClusterHistoryPartition() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), 
"3s");
+        FileStoreTable table = createTable("pt", 1, options);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+
+        // first write
+        List<String> expected1 = new ArrayList<>();
+        for (int pt = 0; pt < 4; pt++) {
+            for (int i = 0; i < 3; i++) {
+                for (int j = 0; j < 3; j++) {
+                    messages.addAll(write(GenericRow.of(i, j, randomStr, pt)));
+                    expected1.add(String.format("+I[%s, %s, %s]", i, j, pt));
+                }
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1, 3});
+        List<String> result1 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        assertThat(result1).containsExactlyElementsOf(expected1);
+
+        // first cluster, files in four partitions will be in top level
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(4);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        List<String> result2 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected2 = new ArrayList<>();
+        for (int pt = 0; pt < 4; pt++) {
+            expected2.add(String.format("+I[0, 0, %s]", pt));
+            expected2.add(String.format("+I[0, 1, %s]", pt));
+            expected2.add(String.format("+I[1, 0, %s]", pt));
+            expected2.add(String.format("+I[1, 1, %s]", pt));
+            expected2.add(String.format("+I[0, 2, %s]", pt));
+            expected2.add(String.format("+I[1, 2, %s]", pt));
+            expected2.add(String.format("+I[2, 0, %s]", pt));
+            expected2.add(String.format("+I[2, 1, %s]", pt));
+            expected2.add(String.format("+I[2, 2, %s]", pt));
+        }
+        assertThat(result2).containsExactlyElementsOf(expected2);
+
+        // second write
+        messages.clear();
+        for (int pt = 0; pt < 4; pt++) {
+            messages.addAll(
+                    write(
+                            GenericRow.of(0, 3, null, pt),
+                            GenericRow.of(1, 3, null, pt),
+                            GenericRow.of(2, 3, null, pt)));
+            messages.addAll(
+                    write(
+                            GenericRow.of(3, 0, null, pt),
+                            GenericRow.of(3, 1, null, pt),
+                            GenericRow.of(3, 2, null, pt),
+                            GenericRow.of(3, 3, null, pt)));
+            // pt-0, pt-1 will be history partition
+            if (pt == 1) {
+                Thread.sleep(3000);
+            }
+        }
+        commit(messages);
+
+        List<String> result3 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected3 = new ArrayList<>();
+        for (int pt = 0; pt < 4; pt++) {
+            expected3.addAll(expected2.subList(9 * pt, 9 * pt + 9));
+            expected3.add(String.format("+I[0, 3, %s]", pt));
+            expected3.add(String.format("+I[1, 3, %s]", pt));
+            expected3.add(String.format("+I[2, 3, %s]", pt));
+            expected3.add(String.format("+I[3, 0, %s]", pt));
+            expected3.add(String.format("+I[3, 1, %s]", pt));
+            expected3.add(String.format("+I[3, 2, %s]", pt));
+            expected3.add(String.format("+I[3, 3, %s]", pt));
+        }
+        assertThat(result3).containsExactlyElementsOf(expected3);
+
+        // second cluster
+        runAction(Lists.newArrayList("--partition", "pt=3"));
+        checkSnapshot(table);
+        splits = readBuilder.newScan().plan().splits();
+        List<String> result4 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected4 = new ArrayList<>();
+        assertThat(splits.size()).isEqualTo(4);
+        // for pt-0 and pt-1: history partition, full clustering, all files 
will be
+        // picked for clustering, outputLevel is 5.
+        for (int pt = 0; pt <= 1; pt++) {
+            expected4.add(String.format("+I[0, 0, %s]", pt));
+            expected4.add(String.format("+I[0, 1, %s]", pt));
+            expected4.add(String.format("+I[1, 0, %s]", pt));
+            expected4.add(String.format("+I[1, 1, %s]", pt));
+            expected4.add(String.format("+I[0, 2, %s]", pt));
+            expected4.add(String.format("+I[0, 3, %s]", pt));
+            expected4.add(String.format("+I[1, 2, %s]", pt));
+            expected4.add(String.format("+I[1, 3, %s]", pt));
+            expected4.add(String.format("+I[2, 0, %s]", pt));
+            expected4.add(String.format("+I[2, 1, %s]", pt));
+            expected4.add(String.format("+I[3, 0, %s]", pt));
+            expected4.add(String.format("+I[3, 1, %s]", pt));
+            expected4.add(String.format("+I[2, 2, %s]", pt));
+            expected4.add(String.format("+I[2, 3, %s]", pt));
+            expected4.add(String.format("+I[3, 2, %s]", pt));
+            expected4.add(String.format("+I[3, 3, %s]", pt));
+            // the table has enabled 'scan.plan-sort-partition', so the splits 
has been sorted by
+            // partition
+            assertThat(((DataSplit) 
splits.get(pt)).dataFiles().size()).isEqualTo(1);
+            assertThat(((DataSplit) 
splits.get(pt)).dataFiles().get(0).level()).isEqualTo(5);
+        }
+        // for pt-2, non history partition, nor specified partition, nothing 
happened
+        expected4.addAll(expected3.subList(32, 48));
+        assertThat(((DataSplit) 
splits.get(2)).dataFiles().size()).isEqualTo(3);
+        // for pt-3: minor clustering, only file in level-0 will be picked for 
clustering,
+        // outputLevel is 4
+        expected4.add("+I[0, 0, 3]");
+        expected4.add("+I[0, 1, 3]");
+        expected4.add("+I[1, 0, 3]");
+        expected4.add("+I[1, 1, 3]");
+        expected4.add("+I[0, 2, 3]");
+        expected4.add("+I[1, 2, 3]");
+        expected4.add("+I[2, 0, 3]");
+        expected4.add("+I[2, 1, 3]");
+        expected4.add("+I[2, 2, 3]");
+        expected4.add("+I[0, 3, 3]");
+        expected4.add("+I[1, 3, 3]");
+        expected4.add("+I[3, 0, 3]");
+        expected4.add("+I[3, 1, 3]");
+        expected4.add("+I[2, 3, 3]");
+        expected4.add("+I[3, 2, 3]");
+        expected4.add("+I[3, 3, 3]");
+        assertThat(((DataSplit) 
splits.get(3)).dataFiles().size()).isEqualTo(2);
+        assertThat(
+                        ((DataSplit) splits.get(3))
+                                .dataFiles().stream()
+                                        .map(DataFileMeta::level)
+                                        .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(4, 5);
+
+        assertThat(result4).containsExactlyElementsOf(expected4);
+    }
+
     @Test
     public void testClusterOnEmptyData() throws Exception {
         createTable("pt", 1);
@@ -410,8 +567,14 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     protected FileStoreTable createTable(String partitionKeys, int 
sinkParallelism)
             throws Exception {
+        return createTable(partitionKeys, sinkParallelism, 
Collections.emptyMap());
+    }
+
+    protected FileStoreTable createTable(
+            String partitionKeys, int sinkParallelism, Map<String, String> 
options)
+            throws Exception {
         catalog.createDatabase(database, true);
-        catalog.createTable(identifier(), schema(partitionKeys, 
sinkParallelism), true);
+        catalog.createTable(identifier(), schema(partitionKeys, 
sinkParallelism, options), true);
         return (FileStoreTable) catalog.getTable(identifier());
     }
 
@@ -440,6 +603,11 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
     }
 
     private static Schema schema(String partitionKeys, int sinkParallelism) {
+        return schema(partitionKeys, sinkParallelism, Collections.emptyMap());
+    }
+
+    private static Schema schema(
+            String partitionKeys, int sinkParallelism, Map<String, String> 
options) {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("a", DataTypes.INT());
         schemaBuilder.column("b", DataTypes.INT());
@@ -454,6 +622,9 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         schemaBuilder.option("clustering.incremental", "true");
         schemaBuilder.option("scan.parallelism", "1");
         schemaBuilder.option("sink.parallelism", 
String.valueOf(sinkParallelism));
+        for (String key : options.keySet()) {
+            schemaBuilder.option(key, options.get(key));
+        }
         if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) {
             schemaBuilder.partitionKeys(partitionKeys);
         }

Reply via email to