This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6850329895 [core] chain table support special partition expire. (#7643)
6850329895 is described below

commit 685032989510e55f2c34a29031f8b5c82d20804d
Author: Wenchao Wu <[email protected]>
AuthorDate: Thu May 28 10:17:16 2026 +0800

    [core] chain table support special partition expire. (#7643)
    
    This PR implements partition expiration for chain tables. Chain tables
    store data across snapshot
    and delta branches, where delta partitions depend on their nearest
    earlier snapshot partition as an
    anchor for merge-on-read. Standard partition expiration cannot be
    applied directly because dropping
    a snapshot partition without considering its dependent deltas would
    break the chain integrity.
---
 docs/docs/primary-key-table/chain-table.md         |  64 ++
 .../java/org/apache/paimon/AbstractFileStore.java  |  72 +-
 .../metastore/ChainTableCommitPreCallback.java     |  98 ++-
 .../operation/ChainTablePartitionExpire.java       | 446 ++++++++++++
 ...itionExpire.java => NormalPartitionExpire.java} |  11 +-
 .../apache/paimon/operation/PartitionExpire.java   | 222 +-----
 .../org/apache/paimon/schema/SchemaValidation.java |   8 +
 .../operation/ChainTablePartitionExpireTest.java   | 786 +++++++++++++++++++++
 .../paimon/operation/PartitionExpireTest.java      |  18 +-
 .../flink/action/ExpirePartitionsAction.java       |   6 +-
 .../flink/procedure/ExpirePartitionsProcedure.java |   2 -
 11 files changed, 1495 insertions(+), 238 deletions(-)

diff --git a/docs/docs/primary-key-table/chain-table.md 
b/docs/docs/primary-key-table/chain-table.md
index b5890d428b..b061efd36a 100644
--- a/docs/docs/primary-key-table/chain-table.md
+++ b/docs/docs/primary-key-table/chain-table.md
@@ -204,3 +204,67 @@ partition keys:
 
 This treats `(dt, hour)` as the composite chain dimension and everything 
before it (e.g., `region`) as
 the group dimension.
+
+## Partition Expiration
+
+Chain tables support automatic partition expiration via the standard 
`partition.expiration-time` option.
+However, the expiration algorithm differs from normal tables to preserve chain 
integrity.
+
+### How It Works
+
+In a normal table, every partition older than the cutoff (`now - 
partition.expiration-time`) is dropped
+independently. Chain tables cannot do this because a delta partition depends 
on its nearest earlier
+snapshot partition as an anchor for merge-on-read. Dropping the anchor would 
break the chain.
+
+Chain table expiration works in **segments**. A segment consists of one 
snapshot partition and all the
+delta partitions whose time falls between that snapshot and the next snapshot 
in sorted order. The
+segment is the atomic unit of expiration: either the entire segment is 
expired, or nothing in it is.
+
+The algorithm per group:
+1. List all snapshot branch partitions sorted by chain partition time.
+2. Filter to those before the cutoff (`now - partition.expiration-time`).
+3. If fewer than 2 snapshots are before the cutoff, nothing can be expired — 
the only one must be kept
+   as the anchor.
+4. The most recent snapshot before the cutoff is the **anchor** (kept). All 
earlier snapshots and their
+   associated delta partitions form expirable segments.
+5. Delta partitions are dropped before snapshot partitions so that the commit 
pre-check always passes.
+
+For tables with group partitions, each group is processed independently. A 
group with many expired
+snapshots can have segments expired while another group with only one snapshot 
before the cutoff retains
+all of its data.
+
+### Example
+
+```sql
+ALTER TABLE default.t SET TBLPROPERTIES (
+    'partition.expiration-time' = '30 d',
+    'partition.expiration-check-interval' = '1 d'
+);
+ALTER TABLE `default`.`t$branch_snapshot` SET TBLPROPERTIES (
+    'partition.expiration-time' = '30 d',
+    'partition.expiration-check-interval' = '1 d'
+);
+ALTER TABLE `default`.`t$branch_delta` SET TBLPROPERTIES (
+    'partition.expiration-time' = '30 d',
+    'partition.expiration-check-interval' = '1 d'
+);
+```
+
+Suppose the snapshot branch has partitions `S(0101)`, `S(0201)`, `S(0301)` and 
the delta branch has
+`D(0110)`, `D(0210)`, `D(0315)`. On `2025-03-31` with a 30-day retention the 
cutoff is `2025-03-01`:
+
+- Snapshots before cutoff: `S(0101)`, `S(0201)`. Anchor = `S(0201)` (kept).
+- Segment 1 expired: `S(0101)` + `D(0110)` (delta between `S(0101)` and 
`S(0201)`).
+- Remaining: `S(0201)`, `S(0301)`, `D(0210)`, `D(0315)`.
+
+### Important Notes
+
+- **Delta-only groups are not expired.** If a group has delta partitions but 
no snapshot partition, its
+  deltas are the only copy of that group's data. Partition expiration will not 
touch them. They will
+  start to be expired once at least two snapshot partitions exist for the 
group and fall before the
+  cutoff.
+- **Conflict detection is anchor-aware.** When `partition.expiration-strategy` 
is `values-time`, the
+  conflict detection during writes correctly recognizes that anchor partitions 
are retained and does not
+  reject writes to them.
+- The `partition.expiration-time` and `partition.expiration-check-interval` 
options should be set
+  consistently across the main table and both branches.
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 9aad0259cd..cb3bae1dbe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -19,6 +19,7 @@
 package org.apache.paimon;
 
 import org.apache.paimon.CoreOptions.ExternalPathStrategy;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
 import org.apache.paimon.catalog.SnapshotCommit;
 import org.apache.paimon.catalog.TableRollback;
@@ -38,16 +39,19 @@ import 
org.apache.paimon.metastore.ChainTableCommitPreCallback;
 import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
 import org.apache.paimon.metastore.TagPreviewCommitCallback;
 import org.apache.paimon.metastore.VisibilityWaitCallback;
+import org.apache.paimon.operation.ChainTablePartitionExpire;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.operation.NormalPartitionExpire;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.operation.commit.CommitRollback;
 import org.apache.paimon.operation.commit.ConflictDetection;
 import org.apache.paimon.partition.PartitionExpireStrategy;
+import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.service.ServiceManager;
@@ -65,6 +69,7 @@ import org.apache.paimon.tag.SuccessFileTagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainTableUtils;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IndexFilePathFactories;
@@ -440,6 +445,10 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
             return null;
         }
 
+        if (options.isChainTable()) {
+            return newChainTablePartitionExpire(table);
+        }
+
         return newPartitionExpire(
                 commitUser,
                 table,
@@ -459,12 +468,19 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
             Duration expirationTime,
             Duration checkInterval,
             PartitionExpireStrategy expireStrategy) {
+        if (options.isChainTable()) {
+            checkArgument(
+                    expireStrategy instanceof 
PartitionValuesTimeExpireStrategy,
+                    "Chain table only supports 'values-time' partition 
expiration strategy.");
+            return newChainTablePartitionExpire(table, expirationTime, 
checkInterval);
+        }
+
         PartitionModification partitionModification = null;
         if (options.partitionedTableInMetastore()) {
             partitionModification = catalogEnvironment.partitionModification();
         }
 
-        return new PartitionExpire(
+        return new NormalPartitionExpire(
                 expirationTime,
                 checkInterval,
                 expireStrategy,
@@ -476,6 +492,60 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.partitionExpireBatchSize());
     }
 
+    @Nullable
+    private ChainTablePartitionExpire 
newChainTablePartitionExpire(FileStoreTable table) {
+        Duration partitionExpireTime = options.partitionExpireTime();
+        if (partitionExpireTime == null) {
+            return null;
+        }
+        return newChainTablePartitionExpire(
+                table, partitionExpireTime, 
options.partitionExpireCheckInterval());
+    }
+
+    @Nullable
+    private ChainTablePartitionExpire newChainTablePartitionExpire(
+            FileStoreTable table, Duration expirationTime, Duration 
checkInterval) {
+        if (partitionType().getFieldCount() == 0) {
+            return null;
+        }
+        FileStoreTable primaryTable = 
ChainTableUtils.resolveChainPrimaryTable(table);
+        FileStoreTable snapshotTable =
+                
primaryTable.switchToBranch(options.scanFallbackSnapshotBranch());
+        FileStoreTable deltaTable = 
primaryTable.switchToBranch(options.scanFallbackDeltaBranch());
+        return new ChainTablePartitionExpire(
+                expirationTime,
+                checkInterval,
+                snapshotTable,
+                deltaTable,
+                options,
+                partitionType(),
+                options.endInputCheckPartitionExpire(),
+                options.partitionExpireMaxNum(),
+                options.partitionExpireBatchSize(),
+                
newPartitionModificationForBranch(options.scanFallbackSnapshotBranch()),
+                
newPartitionModificationForBranch(options.scanFallbackDeltaBranch()));
+    }
+
+    @Nullable
+    private PartitionModification newPartitionModificationForBranch(String 
branchName) {
+        if (!options.partitionedTableInMetastore()) {
+            return null;
+        }
+
+        Identifier identifier = catalogEnvironment.identifier();
+        if (identifier == null) {
+            return catalogEnvironment.partitionModification();
+        }
+
+        Identifier branchIdentifier =
+                new Identifier(
+                        identifier.getDatabaseName(),
+                        identifier.getTableName(),
+                        branchName,
+                        identifier.getSystemTableName());
+        return 
catalogEnvironment.copy(branchIdentifier).partitionModification();
+    }
+
     @Override
     public TagAutoManager newTagAutoManager(FileStoreTable table) {
         return TagAutoManager.create(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
index 0b26cb6375..03399a4fde 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
@@ -36,6 +36,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitPreCallback;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainPartitionProjector;
 import org.apache.paimon.utils.ChainTableUtils;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -43,6 +44,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -111,33 +113,59 @@ public class ChainTableCommitPreCallback implements 
CommitPreCallback {
                         partitionType,
                         table.schema().partitionKeys().toArray(new String[0]),
                         coreOptions.legacyPartitionName());
-        RecordComparator partitionComparator =
-                
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+
+        List<String> chainKeys =
+                ChainTableUtils.chainPartitionKeys(coreOptions, 
table.schema().partitionKeys());
+        int chainFieldCount = chainKeys.size();
+        ChainPartitionProjector projector =
+                new ChainPartitionProjector(partitionType, chainFieldCount);
+        int groupFieldCount = projector.groupFieldCount();
+        RecordComparator chainComparator =
+                
CodeGenUtils.newRecordComparator(projector.chainPartitionType().getFieldTypes());
+
         List<BinaryRow> snapshotPartitions =
                 table.newSnapshotReader().partitionEntries().stream()
                         .map(PartitionEntry::partition)
-                        .sorted(partitionComparator)
                         .collect(Collectors.toList());
         SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader();
         PredicateBuilder builder = new PredicateBuilder(partitionType);
         for (BinaryRow partition : changedPartitions) {
+            BinaryRow partitionGroup = 
projector.extractGroupPartition(partition);
+            BinaryRow partitionChain = 
projector.extractChainPartition(partition);
+
+            List<BinaryRow> sameGroupSnapshots =
+                    filterSameGroup(snapshotPartitions, partitionGroup, 
projector);
+            sameGroupSnapshots.sort(
+                    (a, b) ->
+                            chainComparator.compare(
+                                    projector.extractChainPartition(a),
+                                    projector.extractChainPartition(b)));
+
             Optional<BinaryRow> preSnapshotPartition =
-                    findPreSnapshotPartition(snapshotPartitions, partition, 
partitionComparator);
+                    findPreSnapshotInGroup(
+                            sameGroupSnapshots, partitionChain, 
chainComparator, projector);
             Optional<BinaryRow> nextSnapshotPartition =
-                    findNextSnapshotPartition(snapshotPartitions, partition, 
partitionComparator);
+                    findNextSnapshotInGroup(
+                            sameGroupSnapshots, partitionChain, 
chainComparator, projector);
+
             Predicate deltaFollowingPredicate =
-                    ChainTableUtils.createTriangularPredicate(
-                            partition, partitionConverter, builder::equal, 
builder::greaterThan);
+                    ChainTableUtils.createGroupChainPredicate(
+                            partition,
+                            partitionConverter,
+                            groupFieldCount,
+                            builder::equal,
+                            builder::greaterThan);
             List<BinaryRow> deltaFollowingPartitions =
                     
deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate)
                             .partitionEntries().stream()
                             .map(PartitionEntry::partition)
                             .filter(
                                     deltaPartition ->
-                                            isBeforeNextSnapshotPartition(
+                                            isBeforeNextSnapshotInGroup(
                                                     deltaPartition,
                                                     nextSnapshotPartition,
-                                                    partitionComparator))
+                                                    chainComparator,
+                                                    projector))
                             .collect(Collectors.toList());
             boolean canDrop =
                     deltaFollowingPartitions.isEmpty() || 
preSnapshotPartition.isPresent();
@@ -159,13 +187,26 @@ public class ChainTableCommitPreCallback implements 
CommitPreCallback {
                 && indexFiles.stream().allMatch(f -> f.kind() == 
FileKind.DELETE);
     }
 
-    private Optional<BinaryRow> findPreSnapshotPartition(
-            List<BinaryRow> snapshotPartitions,
-            BinaryRow partition,
-            RecordComparator partitionComparator) {
+    private List<BinaryRow> filterSameGroup(
+            List<BinaryRow> partitions, BinaryRow groupKey, 
ChainPartitionProjector projector) {
+        List<BinaryRow> result = new ArrayList<>();
+        for (BinaryRow partition : partitions) {
+            if (projector.extractGroupPartition(partition).equals(groupKey)) {
+                result.add(partition);
+            }
+        }
+        return result;
+    }
+
+    private Optional<BinaryRow> findPreSnapshotInGroup(
+            List<BinaryRow> sortedSameGroupPartitions,
+            BinaryRow targetChain,
+            RecordComparator chainComparator,
+            ChainPartitionProjector projector) {
         BinaryRow pre = null;
-        for (BinaryRow snapshotPartition : snapshotPartitions) {
-            if (partitionComparator.compare(snapshotPartition, partition) < 0) 
{
+        for (BinaryRow snapshotPartition : sortedSameGroupPartitions) {
+            BinaryRow chain = 
projector.extractChainPartition(snapshotPartition);
+            if (chainComparator.compare(chain, targetChain) < 0) {
                 pre = snapshotPartition;
             } else {
                 break;
@@ -174,24 +215,31 @@ public class ChainTableCommitPreCallback implements 
CommitPreCallback {
         return Optional.ofNullable(pre);
     }
 
-    private Optional<BinaryRow> findNextSnapshotPartition(
-            List<BinaryRow> snapshotPartitions,
-            BinaryRow partition,
-            RecordComparator partitionComparator) {
-        for (BinaryRow snapshotPartition : snapshotPartitions) {
-            if (partitionComparator.compare(snapshotPartition, partition) > 0) 
{
+    private Optional<BinaryRow> findNextSnapshotInGroup(
+            List<BinaryRow> sortedSameGroupPartitions,
+            BinaryRow targetChain,
+            RecordComparator chainComparator,
+            ChainPartitionProjector projector) {
+        for (BinaryRow snapshotPartition : sortedSameGroupPartitions) {
+            BinaryRow chain = 
projector.extractChainPartition(snapshotPartition);
+            if (chainComparator.compare(chain, targetChain) > 0) {
                 return Optional.of(snapshotPartition);
             }
         }
         return Optional.empty();
     }
 
-    private boolean isBeforeNextSnapshotPartition(
+    private boolean isBeforeNextSnapshotInGroup(
             BinaryRow partition,
             Optional<BinaryRow> nextSnapshotPartition,
-            RecordComparator partitionComparator) {
-        return !nextSnapshotPartition.isPresent()
-                || partitionComparator.compare(partition, 
nextSnapshotPartition.get()) < 0;
+            RecordComparator chainComparator,
+            ChainPartitionProjector projector) {
+        if (!nextSnapshotPartition.isPresent()) {
+            return true;
+        }
+        BinaryRow partitionChain = projector.extractChainPartition(partition);
+        BinaryRow nextChain = 
projector.extractChainPartition(nextSnapshotPartition.get());
+        return chainComparator.compare(partitionChain, nextChain) < 0;
     }
 
     private String generatePartitionValues(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java
new file mode 100644
index 0000000000..0e70854363
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PartitionModification;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainPartitionProjector;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * Partition expiration for chain tables.
+ *
+ * <p>Chain tables store data across snapshot and delta branches. A delta 
partition depends on its
+ * nearest earlier snapshot partition as an anchor for merge-on-read. This 
class expires partitions
+ * in "segments" defined by consecutive snapshot partitions to maintain chain 
integrity.
+ *
+ * <p>A segment consists of one snapshot partition and all delta partitions 
whose time falls between
+ * that snapshot and the next snapshot in sorted order. The segment is the 
atomic unit of
+ * expiration: either the entire segment (snapshot + deltas) is expired, or 
nothing in it is.
+ *
+ * <p>Algorithm per group:
+ *
+ * <ol>
+ *   <li>List all snapshot branch partitions sorted by chain partition time.
+ *   <li>Filter to those before the cutoff ({@code now - expirationTime}).
+ *   <li>If fewer than 2 snapshots are before the cutoff, nothing can be 
expired (the last one must
+ *       be kept as anchor).
+ *   <li>The most recent snapshot before the cutoff is the anchor (kept). All 
earlier snapshots form
+ *       expirable segments together with their associated delta partitions.
+ *   <li>The number of segments expired is limited by {@code maxExpireNum}.
+ *   <li>Delta partitions are dropped first, then snapshot partitions, so that 
{@code
+ *       ChainTableCommitPreCallback} validation passes.
+ * </ol>
+ */
+public class ChainTablePartitionExpire implements PartitionExpire {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChainTablePartitionExpire.class);
+
+    private final Duration expirationTime;
+    private final Duration checkInterval;
+    private final FileStoreTable snapshotTable;
+    private final FileStoreTable deltaTable;
+    private final PartitionTimeExtractor timeExtractor;
+    private final ChainPartitionProjector projector;
+    private final RecordComparator chainPartitionComparator;
+    private final InternalRowPartitionComputer partitionComputer;
+    private final List<String> partitionKeys;
+    private final List<String> chainPartitionKeys;
+    private final boolean endInputCheckPartitionExpire;
+    private final int maxExpireNum;
+    private final int expireBatchSize;
+    @Nullable private final PartitionModification 
snapshotPartitionModification;
+    @Nullable private final PartitionModification deltaPartitionModification;
+    private LocalDateTime lastCheck;
+
+    public ChainTablePartitionExpire(
+            Duration expirationTime,
+            Duration checkInterval,
+            FileStoreTable snapshotTable,
+            FileStoreTable deltaTable,
+            CoreOptions options,
+            RowType partitionType,
+            boolean endInputCheckPartitionExpire,
+            int maxExpireNum,
+            int expireBatchSize,
+            @Nullable PartitionModification snapshotPartitionModification,
+            @Nullable PartitionModification deltaPartitionModification) {
+        this.expirationTime = expirationTime;
+        this.checkInterval = checkInterval;
+        this.snapshotTable = snapshotTable;
+        this.deltaTable = deltaTable;
+        this.partitionKeys = partitionType.getFieldNames();
+        this.maxExpireNum = maxExpireNum;
+        this.expireBatchSize = expireBatchSize;
+        this.snapshotPartitionModification = snapshotPartitionModification;
+        this.deltaPartitionModification = deltaPartitionModification;
+
+        List<String> allPartitionKeys = partitionType.getFieldNames();
+        this.chainPartitionKeys = ChainTableUtils.chainPartitionKeys(options, 
allPartitionKeys);
+        int chainFieldCount = chainPartitionKeys.size();
+        this.projector = new ChainPartitionProjector(partitionType, 
chainFieldCount);
+        this.chainPartitionComparator =
+                
CodeGenUtils.newRecordComparator(projector.chainPartitionType().getFieldTypes());
+        this.timeExtractor =
+                new PartitionTimeExtractor(
+                        options.partitionTimestampPattern(), 
options.partitionTimestampFormatter());
+        this.partitionComputer =
+                new InternalRowPartitionComputer(
+                        options.partitionDefaultName(),
+                        partitionType,
+                        allPartitionKeys.toArray(new String[0]),
+                        options.legacyPartitionName());
+        this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
+
+        long rndSeconds = 0;
+        long checkIntervalSeconds = checkInterval.toMillis() / 1000;
+        if (checkIntervalSeconds > 0) {
+            rndSeconds = 
ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
+        }
+        this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);
+    }
+
+    @Override
+    public List<Map<String, String>> expire(long commitIdentifier) {
+        return expire(LocalDateTime.now(), commitIdentifier);
+    }
+
+    @Override
+    public boolean isValueExpiration() {
+        return true;
+    }
+
+    @Override
+    public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
+        return isValueAllExpired(partitions, LocalDateTime.now());
+    }
+
+    @VisibleForTesting
+    boolean isValueAllExpired(Collection<BinaryRow> partitions, LocalDateTime 
now) {
+        LocalDateTime expireDateTime = now.minus(expirationTime);
+        for (BinaryRow partition : partitions) {
+            LocalDateTime partTime = extractPartitionTime(partition);
+            if (partTime == null || !expireDateTime.isAfter(partTime)) {
+                return false;
+            }
+        }
+
+        // All partitions are time-wise before cutoff, but chain table retains 
anchors
+        // (the most recent snapshot before cutoff per group) and their 
segment's deltas.
+        // Compute per-group retain boundary: partitions at or after the 
boundary are retained.
+        Map<BinaryRow, LocalDateTime> retainBoundary = 
computeGroupRetainBoundary(expireDateTime);
+        for (BinaryRow partition : partitions) {
+            BinaryRow groupKey = projector.extractGroupPartition(partition);
+            LocalDateTime boundary = retainBoundary.get(groupKey);
+            if (boundary == null) {
+                return false;
+            }
+            LocalDateTime partTime = extractPartitionTime(partition);
+            if (partTime != null && !partTime.isBefore(boundary)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * For each group that has snapshot partitions, compute the time boundary 
at or above which
+     * partitions are retained (not expired). Returns {@link 
LocalDateTime#MIN} for groups where
+     * fewer than 2 snapshots fall before the cutoff (nothing can be expired). 
Groups with no
+     * snapshot partitions at all (delta-only) are not included in the result; 
without a snapshot
+     * anchor, their earlier deltas may still be required by later delta-only 
chain reads.
+     */
+    private Map<BinaryRow, LocalDateTime> 
computeGroupRetainBoundary(LocalDateTime cutoffTime) {
+        List<PartitionEntry> snapshotEntries = 
snapshotTable.newSnapshotReader().partitionEntries();
+        Map<BinaryRow, List<BinaryRow>> groupedSnapshots = 
groupByGroupKey(snapshotEntries);
+
+        Map<BinaryRow, LocalDateTime> boundaries = new HashMap<>();
+        for (Map.Entry<BinaryRow, List<BinaryRow>> entry : 
groupedSnapshots.entrySet()) {
+            BinaryRow groupKey = entry.getKey();
+            int countBeforeCutoff = 0;
+            LocalDateTime latestBeforeCutoff = null;
+            for (BinaryRow snapshot : entry.getValue()) {
+                LocalDateTime time = extractPartitionTime(snapshot);
+                if (time != null && cutoffTime.isAfter(time)) {
+                    countBeforeCutoff++;
+                    if (latestBeforeCutoff == null || 
time.isAfter(latestBeforeCutoff)) {
+                        latestBeforeCutoff = time;
+                    }
+                }
+            }
+            if (countBeforeCutoff < 2) {
+                boundaries.put(groupKey, LocalDateTime.MIN);
+            } else {
+                boundaries.put(groupKey, latestBeforeCutoff);
+            }
+        }
+        return boundaries;
+    }
+
+    @VisibleForTesting
+    void setLastCheck(LocalDateTime time) {
+        lastCheck = time;
+    }
+
+    @VisibleForTesting
+    List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) 
{
+        if (checkInterval.isZero()
+                || now.isAfter(lastCheck.plus(checkInterval))
+                || (endInputCheckPartitionExpire && Long.MAX_VALUE == 
commitIdentifier)) {
+            List<Map<String, String>> expired = 
doExpire(now.minus(expirationTime));
+            lastCheck = now;
+            return expired;
+        }
+        return null;
+    }
+
+    private List<Map<String, String>> doExpire(LocalDateTime cutoffTime) {
+        List<PartitionEntry> snapshotPartitions =
+                snapshotTable.newSnapshotReader().partitionEntries();
+        List<PartitionEntry> deltaPartitions = 
deltaTable.newSnapshotReader().partitionEntries();
+
+        Map<BinaryRow, List<BinaryRow>> groupedSnapshots = 
groupByGroupKey(snapshotPartitions);
+        Map<BinaryRow, List<BinaryRow>> groupedDeltas = 
groupByGroupKey(deltaPartitions);
+
+        List<BinaryRow> snapshotPartitionsToExpire = new ArrayList<>();
+        List<BinaryRow> deltaPartitionsToExpire = new ArrayList<>();
+
+        for (Map.Entry<BinaryRow, List<BinaryRow>> entry : 
groupedSnapshots.entrySet()) {
+            BinaryRow groupKey = entry.getKey();
+            List<BinaryRow> groupSnapshots = entry.getValue();
+
+            groupSnapshots.sort(
+                    (a, b) ->
+                            chainPartitionComparator.compare(
+                                    projector.extractChainPartition(a),
+                                    projector.extractChainPartition(b)));
+
+            List<BinaryRow> snapshotsBeforeCutoff = new ArrayList<>();
+            for (BinaryRow partition : groupSnapshots) {
+                LocalDateTime partTime = extractPartitionTime(partition);
+                if (partTime != null && cutoffTime.isAfter(partTime)) {
+                    snapshotsBeforeCutoff.add(partition);
+                }
+            }
+
+            if (snapshotsBeforeCutoff.size() < 2) {
+                continue;
+            }
+
+            // Anchor = most recent snapshot before cutoff, kept as merge base
+            int anchorIndex = snapshotsBeforeCutoff.size() - 1;
+
+            // Expirable snapshots: all before anchor, oldest first.
+            // Each forms a segment with its associated deltas.
+            int segmentsToExpire = Math.min(anchorIndex, maxExpireNum);
+
+            List<BinaryRow> groupDeltas = groupedDeltas.get(groupKey);
+
+            for (int i = 0; i < segmentsToExpire; i++) {
+                BinaryRow segmentSnapshot = snapshotsBeforeCutoff.get(i);
+                snapshotPartitionsToExpire.add(segmentSnapshot);
+
+                if (groupDeltas != null) {
+                    // Segment boundary: from this snapshot's time up to the 
next snapshot's time
+                    LocalDateTime segmentStart = 
extractPartitionTime(segmentSnapshot);
+                    BinaryRow nextSnapshot = snapshotsBeforeCutoff.get(i + 1);
+                    LocalDateTime segmentEnd = 
extractPartitionTime(nextSnapshot);
+
+                    if (segmentStart != null && segmentEnd != null) {
+                        for (BinaryRow deltaPartition : groupDeltas) {
+                            LocalDateTime deltaTime = 
extractPartitionTime(deltaPartition);
+                            if (deltaTime != null
+                                    && !deltaTime.isBefore(segmentStart)
+                                    && deltaTime.isBefore(segmentEnd)) {
+                                deltaPartitionsToExpire.add(deltaPartition);
+                            }
+                        }
+                    }
+                }
+            }
+
+            // Also collect orphan deltas before the earliest expired snapshot
+            if (segmentsToExpire > 0 && groupDeltas != null) {
+                LocalDateTime firstSnapshotTime =
+                        extractPartitionTime(snapshotsBeforeCutoff.get(0));
+                if (firstSnapshotTime != null) {
+                    for (BinaryRow deltaPartition : groupDeltas) {
+                        LocalDateTime deltaTime = 
extractPartitionTime(deltaPartition);
+                        if (deltaTime != null && 
deltaTime.isBefore(firstSnapshotTime)) {
+                            deltaPartitionsToExpire.add(deltaPartition);
+                        }
+                    }
+                }
+            }
+        }
+
+        if (snapshotPartitionsToExpire.isEmpty() && 
deltaPartitionsToExpire.isEmpty()) {
+            return new ArrayList<>();
+        }
+
+        List<Map<String, String>> deltaSpecs = 
toPartitionSpecs(deltaPartitionsToExpire);
+        List<Map<String, String>> snapshotSpecs = 
toPartitionSpecs(snapshotPartitionsToExpire);
+        List<Map<String, String>> allExpired = new ArrayList<>();
+
+        if (!deltaSpecs.isEmpty()) {
+            LOG.info("Chain table expire delta partitions: {}", deltaSpecs);
+            batchDropPartitions(deltaTable, deltaSpecs, 
deltaPartitionModification);
+            allExpired.addAll(deltaSpecs);
+        }
+
+        if (!snapshotSpecs.isEmpty()) {
+            LOG.info("Chain table expire snapshot partitions: {}", 
snapshotSpecs);
+            batchDropPartitions(snapshotTable, snapshotSpecs, 
snapshotPartitionModification);
+            allExpired.addAll(snapshotSpecs);
+        }
+
+        return allExpired;
+    }
+
+    private void batchDropPartitions(
+            FileStoreTable table,
+            List<Map<String, String>> partitionSpecs,
+            @Nullable PartitionModification partitionModification) {
+        if (partitionModification != null) {
+            try {
+                if (expireBatchSize > 0 && expireBatchSize < 
partitionSpecs.size()) {
+                    for (List<Map<String, String>> batch :
+                            Lists.partition(partitionSpecs, expireBatchSize)) {
+                        partitionModification.dropPartitions(batch);
+                        
partitionModification.dropPartitions(toDonePartitions(batch));
+                    }
+                } else {
+                    partitionModification.dropPartitions(partitionSpecs);
+                    
partitionModification.dropPartitions(toDonePartitions(partitionSpecs));
+                }
+            } catch (Catalog.TableNotExistException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            if (expireBatchSize > 0 && expireBatchSize < 
partitionSpecs.size()) {
+                for (List<Map<String, String>> batch :
+                        Lists.partition(partitionSpecs, expireBatchSize)) {
+                    dropPartitions(table, batch);
+                }
+            } else {
+                dropPartitions(table, partitionSpecs);
+            }
+        }
+    }
+
+    private List<Map<String, String>> toDonePartitions(
+            List<Map<String, String>> expiredPartitions) {
+        List<Map<String, String>> donePartitions = new 
ArrayList<>(expiredPartitions.size());
+        for (Map<String, String> partition : expiredPartitions) {
+            LinkedHashMap<String, String> donePartition = new 
LinkedHashMap<>(partition);
+            Map.Entry<String, String> lastEntry = null;
+            for (Map.Entry<String, String> entry : donePartition.entrySet()) {
+                lastEntry = entry;
+            }
+            if (lastEntry != null) {
+                donePartition.put(lastEntry.getKey(), lastEntry.getValue() + 
".done");
+                donePartitions.add(donePartition);
+            }
+        }
+        return donePartitions;
+    }
+
+    private Map<BinaryRow, List<BinaryRow>> 
groupByGroupKey(List<PartitionEntry> partitionEntries) {
+        Map<BinaryRow, List<BinaryRow>> grouped = new LinkedHashMap<>();
+        for (PartitionEntry entry : partitionEntries) {
+            BinaryRow fullPartition = entry.partition();
+            BinaryRow groupKey = 
projector.extractGroupPartition(fullPartition);
+            grouped.computeIfAbsent(groupKey, k -> new 
ArrayList<>()).add(fullPartition);
+        }
+        return grouped;
+    }
+
+    private LocalDateTime extractPartitionTime(BinaryRow partition) {
+        try {
+            LinkedHashMap<String, String> partValues =
+                    partitionComputer.generatePartValues(partition);
+            List<String> chainValues = new ArrayList<>();
+            for (String key : chainPartitionKeys) {
+                chainValues.add(partValues.get(key));
+            }
+            return timeExtractor.extract(chainPartitionKeys, chainValues);
+        } catch (Exception e) {
+            LOG.warn("Failed to extract partition time from {}", partition, e);
+            return null;
+        }
+    }
+
+    private List<Map<String, String>> toPartitionSpecs(List<BinaryRow> 
partitions) {
+        return partitions.stream()
+                .map(
+                        p -> {
+                            LinkedHashMap<String, String> values =
+                                    partitionComputer.generatePartValues(p);
+                            Map<String, String> spec = new LinkedHashMap<>();
+                            for (String key : partitionKeys) {
+                                String value = values.get(key);
+                                if (value != null) {
+                                    spec.put(key, value);
+                                }
+                            }
+                            return spec;
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private void dropPartitions(FileStoreTable table, List<Map<String, 
String>> partitionSpecs) {
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.truncatePartitions(partitionSpecs);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to drop partitions from %s: %s.", 
table.name(), partitionSpecs),
+                    e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java
similarity index 97%
copy from 
paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
copy to 
paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java
index 74b7850ed7..5a23f538a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java
@@ -44,9 +44,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 /** Expire partitions. */
-public class PartitionExpire {
+public class NormalPartitionExpire implements PartitionExpire {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionExpire.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(NormalPartitionExpire.class);
 
     private static final String DELIMITER = ",";
 
@@ -61,7 +61,7 @@ public class PartitionExpire {
     private final int maxExpireNum;
     private final int expireBatchSize;
 
-    public PartitionExpire(
+    public NormalPartitionExpire(
             Duration expirationTime,
             Duration checkInterval,
             PartitionExpireStrategy strategy,
@@ -90,7 +90,7 @@ public class PartitionExpire {
         this.expireBatchSize = expireBatchSize;
     }
 
-    public PartitionExpire(
+    public NormalPartitionExpire(
             Duration expirationTime,
             Duration checkInterval,
             PartitionExpireStrategy strategy,
@@ -111,14 +111,17 @@ public class PartitionExpire {
                 expireBatchSize);
     }
 
+    @Override
     public List<Map<String, String>> expire(long commitIdentifier) {
         return expire(LocalDateTime.now(), commitIdentifier);
     }
 
+    @Override
     public boolean isValueExpiration() {
         return strategy instanceof PartitionValuesTimeExpireStrategy;
     }
 
+    @Override
     public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
         PartitionValuesTimeExpireStrategy valuesStrategy =
                 (PartitionValuesTimeExpireStrategy) strategy;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 74b7850ed7..bb1487735b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -18,208 +18,38 @@
 
 package org.apache.paimon.operation;
 
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.partition.PartitionExpireStrategy;
-import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
-import org.apache.paimon.table.PartitionModification;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-
-/** Expire partitions. */
-public class PartitionExpire {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionExpire.class);
-
-    private static final String DELIMITER = ",";
-
-    private final Duration expirationTime;
-    private final Duration checkInterval;
-    private final FileStoreScan scan;
-    private final FileStoreCommit commit;
-    @Nullable private final PartitionModification partitionModification;
-    private LocalDateTime lastCheck;
-    private final PartitionExpireStrategy strategy;
-    private final boolean endInputCheckPartitionExpire;
-    private final int maxExpireNum;
-    private final int expireBatchSize;
-
-    public PartitionExpire(
-            Duration expirationTime,
-            Duration checkInterval,
-            PartitionExpireStrategy strategy,
-            FileStoreScan scan,
-            FileStoreCommit commit,
-            @Nullable PartitionModification partitionModification,
-            boolean endInputCheckPartitionExpire,
-            int maxExpireNum,
-            int expireBatchSize) {
-        this.expirationTime = expirationTime;
-        this.checkInterval = checkInterval;
-        this.strategy = strategy;
-        this.scan = scan;
-        this.commit = commit;
-        this.partitionModification = partitionModification;
-        // Avoid the execution time of stream jobs from being too short and 
preventing partition
-        // expiration
-        long rndSeconds = 0;
-        long checkIntervalSeconds = checkInterval.toMillis() / 1000;
-        if (checkIntervalSeconds > 0) {
-            rndSeconds = 
ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
-        }
-        this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);
-        this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
-        this.maxExpireNum = maxExpireNum;
-        this.expireBatchSize = expireBatchSize;
-    }
-
-    public PartitionExpire(
-            Duration expirationTime,
-            Duration checkInterval,
-            PartitionExpireStrategy strategy,
-            FileStoreScan scan,
-            FileStoreCommit commit,
-            @Nullable PartitionModification partitionModification,
-            int maxExpireNum,
-            int expireBatchSize) {
-        this(
-                expirationTime,
-                checkInterval,
-                strategy,
-                scan,
-                commit,
-                partitionModification,
-                false,
-                maxExpireNum,
-                expireBatchSize);
-    }
-
-    public List<Map<String, String>> expire(long commitIdentifier) {
-        return expire(LocalDateTime.now(), commitIdentifier);
-    }
-
-    public boolean isValueExpiration() {
-        return strategy instanceof PartitionValuesTimeExpireStrategy;
-    }
 
-    public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
-        PartitionValuesTimeExpireStrategy valuesStrategy =
-                (PartitionValuesTimeExpireStrategy) strategy;
-        LocalDateTime expireDateTime = 
LocalDateTime.now().minus(expirationTime);
-        for (BinaryRow partition : partitions) {
-            if (!valuesStrategy.isExpired(expireDateTime, partition)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @VisibleForTesting
-    void setLastCheck(LocalDateTime time) {
-        lastCheck = time;
-    }
-
-    @VisibleForTesting
-    List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) 
{
-        if (checkInterval.isZero()
-                || now.isAfter(lastCheck.plus(checkInterval))
-                || (endInputCheckPartitionExpire && Long.MAX_VALUE == 
commitIdentifier)) {
-            List<Map<String, String>> expired =
-                    doExpire(now.minus(expirationTime), commitIdentifier);
-            lastCheck = now;
-            return expired;
-        }
-        return null;
-    }
-
-    private List<Map<String, String>> doExpire(
-            LocalDateTime expireDateTime, long commitIdentifier) {
-        List<PartitionEntry> partitionEntries =
-                strategy.selectExpiredPartitions(scan, expireDateTime);
-        List<List<String>> expiredPartValues = new 
ArrayList<>(partitionEntries.size());
-        for (PartitionEntry partition : partitionEntries) {
-            Object[] array = strategy.convertPartition(partition.partition());
-            expiredPartValues.add(strategy.toPartitionValue(array));
-        }
-
-        List<Map<String, String>> expired = new ArrayList<>();
-        if (!expiredPartValues.isEmpty()) {
-            // convert partition value to partition string, and limit the 
partition num
-            expired = convertToPartitionString(expiredPartValues);
-            LOG.info("Expire Partitions: {}", expired);
-            if (expireBatchSize > 0 && expireBatchSize < expired.size()) {
-                Lists.partition(expired, expireBatchSize)
-                        .forEach(
-                                expiredBatchPartitions ->
-                                        doBatchExpire(expiredBatchPartitions, 
commitIdentifier));
-            } else {
-                doBatchExpire(expired, commitIdentifier);
-            }
-        }
-        return expired;
-    }
-
-    private void doBatchExpire(
-            List<Map<String, String>> expiredBatchPartitions, long 
commitIdentifier) {
-        if (partitionModification != null) {
-            try {
-                partitionModification.dropPartitions(expiredBatchPartitions);
-                // also drop corresponding .done partitions
-                
partitionModification.dropPartitions(toDonePartitions(expiredBatchPartitions));
-            } catch (Catalog.TableNotExistException e) {
-                throw new RuntimeException(e);
-            }
-        } else {
-            // .done partitions only exist when partitionModification != null
-            // (metastore.partitioned-table = true), so no need to handle them 
here
-            commit.dropPartitions(expiredBatchPartitions, commitIdentifier);
-        }
-    }
-
-    private List<Map<String, String>> toDonePartitions(
-            List<Map<String, String>> expiredPartitions) {
-        List<Map<String, String>> donePartitions = new 
ArrayList<>(expiredPartitions.size());
-        for (Map<String, String> partition : expiredPartitions) {
-            LinkedHashMap<String, String> donePartition = new 
LinkedHashMap<>(partition);
-            // append .done suffix to the last partition field value
-            Map.Entry<String, String> lastEntry = null;
-            for (Map.Entry<String, String> entry : donePartition.entrySet()) {
-                lastEntry = entry;
-            }
-            if (lastEntry != null) {
-                donePartition.put(lastEntry.getKey(), lastEntry.getValue() + 
".done");
-                donePartitions.add(donePartition);
-            }
-        }
-        return donePartitions;
-    }
-
-    private List<Map<String, String>> convertToPartitionString(
-            List<List<String>> expiredPartValues) {
-        return expiredPartValues.stream()
-                .map(values -> String.join(DELIMITER, values))
-                .sorted()
-                // Use split(DELIMITER, -1) to preserve trailing empty strings
-                .map(s -> s.split(DELIMITER, -1))
-                .map(strategy::toPartitionString)
-                .limit(Math.min(expiredPartValues.size(), maxExpireNum))
-                .collect(Collectors.toList());
-    }
+/**
+ * Common interface for partition expiration strategies.
+ *
+ * <p>Implementations include {@link NormalPartitionExpire} for normal tables 
and {@link
+ * ChainTablePartitionExpire} for chain tables that require segment-based 
expiration across snapshot
+ * and delta branches.
+ */
+public interface PartitionExpire {
+
+    /**
+     * Expire partitions that are older than the configured expiration time.
+     *
+     * @return the list of expired partition specs, or null if the check 
interval has not elapsed
+     */
+    @Nullable
+    List<Map<String, String>> expire(long commitIdentifier);
+
+    /** Whether this expiration uses values-time strategy. */
+    boolean isValueExpiration();
+
+    /**
+     * Check whether all given partitions are expired according to the 
values-time strategy.
+     *
+     * <p>Only valid when {@link #isValueExpiration()} returns true.
+     */
+    boolean isValueAllExpired(Collection<BinaryRow> partitions);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3d02ede617..c230b9048c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -947,6 +947,14 @@ public class SchemaValidation {
                     options.partitionTimestampFormatter() != null,
                     "Partition timestamp formatter is required for chain 
table.");
 
+            if (options.partitionExpireTime() != null) {
+                Preconditions.checkArgument(
+                        
"values-time".equals(options.partitionExpireStrategy()),
+                        "Chain table only supports 'values-time' partition 
expiration strategy, "
+                                + "but found '%s'.",
+                        options.partitionExpireStrategy());
+            }
+
             // validate chain-table.chain-partition-keys
             List<String> chainPartKeys = 
options.chainTableChainPartitionKeys();
             if (chainPartKeys != null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
new file mode 100644
index 0000000000..5ff5edbf71
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.partition.PartitionUpdateTimeExpireStrategy;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.PartitionModification;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ChainTablePartitionExpire}. */
+public class ChainTablePartitionExpireTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private String commitUser;
+
+    @BeforeEach
+    public void before() {
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @Test
+    public void testExplicitPartitionExpireRejectsUpdateTimeStrategy() throws 
Exception {
+        Path tablePath = tablePath("explicit_update_time_strategy");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+
+        assertThatThrownBy(
+                        () ->
+                                mainTable
+                                        .store()
+                                        .newPartitionExpire(
+                                                commitUser,
+                                                mainTable,
+                                                Duration.ofDays(1),
+                                                Duration.ZERO,
+                                                new 
PartitionUpdateTimeExpireStrategy(
+                                                        
CoreOptions.fromMap(Collections.emptyMap()),
+                                                        
mainTable.schema().logicalPartitionType())))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Chain table only supports 'values-time' partition 
expiration strategy");
+    }
+
+    @Test
+    public void testExpireWithSinglePartitionKey() throws Exception {
+        Path tablePath = tablePath("simple_expire");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(snapshotTable, "20250101", "v1");
+        write(snapshotTable, "20250201", "v2");
+        write(snapshotTable, "20250301", "v3");
+
+        write(deltaTable, "20250110", "v4");
+        write(deltaTable, "20250115", "v5");
+        write(deltaTable, "20250210", "v6");
+        write(deltaTable, "20250215", "v7");
+        write(deltaTable, "20250315", "v8");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+        assertThat(listPartitions(snapshotTable))
+                .containsExactlyInAnyOrder("20250101", "20250201", "20250301");
+        assertThat(listPartitions(deltaTable))
+                .containsExactlyInAnyOrder(
+                        "20250110", "20250115", "20250210", "20250215", 
"20250315");
+
+        // cutoff = 2025-03-31 - 20d = 2025-03-11
+        // Snapshots before cutoff: 20250101, 20250201, 20250301 (3 snapshots)
+        // Anchor = 20250301 (kept), expire 2 segments:
+        //   Segment0: S(20250101), d(20250110), d(20250115)
+        //   Segment1: S(20250201), d(20250210), d(20250215)
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(20), 
false);
+        expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+        List<Map<String, String>> expired =
+                expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), 
Long.MAX_VALUE);
+
+        assertThat(expired).isNotNull();
+        assertThat(expired).isNotEmpty();
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+        
assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250301");
+        
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250315");
+    }
+
+    @Test
+    public void testNoExpireWhenOnlyOneSnapshotBeforeCutoff() throws Exception 
{
+        Path tablePath = tablePath("no_expire_one_snapshot");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(snapshotTable, "20250201", "v1");
+        write(snapshotTable, "20250315", "v2");
+        write(deltaTable, "20250205", "v3");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
false);
+        expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+        List<Map<String, String>> expired =
+                expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), 
Long.MAX_VALUE);
+
+        assertThat(expired).isEmpty();
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+        
assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250201", 
"20250315");
+        
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250205");
+    }
+
+    @Test
+    public void testExpireMultipleSegments() throws Exception {
+        Path tablePath = tablePath("multi_segments");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(snapshotTable, "20250101", "v1");
+        write(snapshotTable, "20250115", "v2");
+        write(snapshotTable, "20250201", "v3");
+        write(snapshotTable, "20250315", "v4");
+
+        write(deltaTable, "20250105", "v5");
+        write(deltaTable, "20250120", "v6");
+        write(deltaTable, "20250210", "v7");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // cutoff = 2025-03-31 - 30d = 2025-03-01
+        // Snapshots before cutoff: 20250101, 20250115, 20250201 (3 snapshots)
+        // Anchor = 20250201 (kept), expire S(20250101), S(20250115)
+        // Delta before anchor: d(20250105), d(20250120)
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
false);
+        expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+        expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE);
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+        
assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250201", 
"20250315");
+        
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250210");
+    }
+
+    @Test
+    public void testNoExpireWhenNoSnapshotsBeforeCutoff() throws Exception {
+        Path tablePath = tablePath("no_expire_no_snapshot");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(snapshotTable, "20250315", "v1");
+        write(snapshotTable, "20250320", "v2");
+        write(deltaTable, "20250316", "v3");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // cutoff = 2025-03-31 - 30d = 2025-03-01
+        // No snapshots before cutoff
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
false);
+        expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+        List<Map<String, String>> expired =
+                expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), 
Long.MAX_VALUE);
+
+        assertThat(expired).isEmpty();
+    }
+
+    @Test
+    public void testCheckIntervalPreventsExpire() throws Exception {
+        Path tablePath = tablePath("check_interval");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(snapshotTable, "20250101", "v1");
+        write(snapshotTable, "20250201", "v2");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        ChainTablePartitionExpire expire =
+                new ChainTablePartitionExpire(
+                        Duration.ofDays(30),
+                        Duration.ofDays(1),
+                        snapshotTable,
+                        deltaTable,
+                        CoreOptions.fromMap(buildOptions(Duration.ofDays(30), 
false)),
+                        snapshotTable.schema().logicalPartitionType(),
+                        false,
+                        Integer.MAX_VALUE,
+                        0,
+                        null,
+                        null);
+        expire.setLastCheck(LocalDateTime.of(2025, 3, 31, 0, 0));
+
+        List<Map<String, String>> expired =
+                expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), 
Long.MAX_VALUE);
+
+        assertThat(expired).isNull();
+    }
+
+    @Test
+    public void testMaxExpireNumLimitsSegments() throws Exception {
+        Path tablePath = tablePath("max_expire_num");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        // Snapshots: S(0101), S(0115), S(0201), S(0315)
+        // cutoff = 03-01, anchor = S(0201)
+        // Segments to expire: Segment1={S(0101), d(0105)}, Segment2={S(0115), 
d(0120)}
+        write(snapshotTable, "20250101", "v1");
+        write(snapshotTable, "20250115", "v2");
+        write(snapshotTable, "20250201", "v3");
+        write(snapshotTable, "20250315", "v4");
+
+        write(deltaTable, "20250105", "v5");
+        write(deltaTable, "20250120", "v6");
+        write(deltaTable, "20250210", "v7");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // maxExpireNum=1 means only 1 segment: Segment1={S(0101), d(0105)}
+        ChainTablePartitionExpire expire =
+                new ChainTablePartitionExpire(
+                        Duration.ofDays(30),
+                        Duration.ZERO,
+                        snapshotTable,
+                        deltaTable,
+                        CoreOptions.fromMap(buildOptions(Duration.ofDays(30), 
false)),
+                        snapshotTable.schema().logicalPartitionType(),
+                        false,
+                        1,
+                        0,
+                        null,
+                        null);
+        expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+        List<Map<String, String>> expired =
+                expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), 
Long.MAX_VALUE);
+
+        assertThat(expired).isNotNull();
+        // 1 segment = S(0101) + d(0105) = 2 partitions
+        assertThat(expired).hasSize(2);
+
+        // Verify: S(0101) expired, S(0115) still exists (not expired, was in 
segment 2)
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+        assertThat(listPartitions(snapshotTable))
+                .containsExactlyInAnyOrder("20250115", "20250201", "20250315");
+        // d(0105) expired, d(0120) and d(0210) kept
+        
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250120", 
"20250210");
+    }
+
+    @Test
+    public void testExpireWithGroupPartition() throws Exception {
+        Path tablePath = tablePath("group_partition");
+        createChainTable(tablePath, true);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        // Group "US": snapshots 0101, 0201, 0301
+        writeGrouped(snapshotTable, "US", "20250101", "v1");
+        writeGrouped(snapshotTable, "US", "20250201", "v2");
+        writeGrouped(snapshotTable, "US", "20250301", "v3");
+        // Group "US": deltas 0110, 0210
+        writeGrouped(deltaTable, "US", "20250110", "d1");
+        writeGrouped(deltaTable, "US", "20250210", "d2");
+
+        // Group "EU": only one snapshot before cutoff, so nothing should 
expire
+        writeGrouped(snapshotTable, "EU", "20250215", "v4");
+        writeGrouped(snapshotTable, "EU", "20250320", "v5");
+        writeGrouped(deltaTable, "EU", "20250220", "d3");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // cutoff = 2025-03-31 - 30d = 2025-03-01
+        // Group "US": snapshots before cutoff = [0101, 0201]. Anchor = 0201 
(kept).
+        //   Expire: S(0101), delta(0110) (before anchor 0201)
+        //   Keep: S(0201), S(0301), delta(0210)
+        // Group "EU": snapshots before cutoff = [0215] (only 1). Nothing 
expired.
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
true);
+        expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+        List<Map<String, String>> expired =
+                expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), 
Long.MAX_VALUE);
+
+        assertThat(expired).isNotNull();
+        assertThat(expired).hasSize(2);
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        List<String> snapshotParts = listGroupedPartitions(snapshotTable);
+        List<String> deltaParts = listGroupedPartitions(deltaTable);
+
+        assertThat(snapshotParts).contains("US|20250201", "US|20250301");
+        assertThat(snapshotParts).doesNotContain("US|20250101");
+        assertThat(snapshotParts).contains("EU|20250215", "EU|20250320");
+
+        assertThat(deltaParts).contains("US|20250210");
+        assertThat(deltaParts).doesNotContain("US|20250110");
+        assertThat(deltaParts).contains("EU|20250220");
+    }
+
+    @Test
+    public void testUsesBranchSpecificPartitionModifications() throws 
Exception {
+        Path tablePath = tablePath("branch_specific_partition_modification");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(snapshotTable, "20250101", "v1");
+        write(snapshotTable, "20250201", "v2");
+        write(snapshotTable, "20250301", "v3");
+        write(deltaTable, "20250110", "d1");
+        write(deltaTable, "20250210", "d2");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        RecordingPartitionModification snapshotModification = new 
RecordingPartitionModification();
+        RecordingPartitionModification deltaModification = new 
RecordingPartitionModification();
+
+        ChainTablePartitionExpire expire =
+                new ChainTablePartitionExpire(
+                        Duration.ofDays(30),
+                        Duration.ZERO,
+                        snapshotTable,
+                        deltaTable,
+                        CoreOptions.fromMap(buildOptions(Duration.ofDays(30), 
false)),
+                        snapshotTable.schema().logicalPartitionType(),
+                        false,
+                        Integer.MAX_VALUE,
+                        0,
+                        snapshotModification,
+                        deltaModification);
+        expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+        expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE);
+
+        assertThat(deltaModification.droppedValues("dt"))
+                .contains("20250110", "20250110.done")
+                .doesNotContain("20250101", "20250101.done");
+        assertThat(snapshotModification.droppedValues("dt"))
+                .contains("20250101", "20250101.done")
+                .doesNotContain("20250110", "20250110.done");
+    }
+
+    @Test
+    public void testIsValueAllExpiredReturnsFalseForAnchor() throws Exception {
+        Path tablePath = tablePath("value_expired_anchor");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        // S(0101), S(0201), S(0301)
+        write(snapshotTable, "20250101", "v1");
+        write(snapshotTable, "20250201", "v2");
+        write(snapshotTable, "20250301", "v3");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // expirationTime = 30d, "now" = 2025-03-31 → cutoff = 2025-03-01
+        // Snapshots before cutoff: S(0101), S(0201). Anchor = S(0201) (kept).
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
false);
+        LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+        BinaryRow anchor0201 = findPartition(snapshotTable, "20250201");
+        BinaryRow expired0101 = findPartition(snapshotTable, "20250101");
+
+        // Anchor partition alone → not all expired (anchor is retained)
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(anchor0201), 
now)).isFalse();
+
+        // Truly expired partition alone → all expired
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(expired0101), 
now)).isTrue();
+
+        // Mix of anchor + expired → not all expired
+        assertThat(expire.isValueAllExpired(Arrays.asList(expired0101, 
anchor0201), now)).isFalse();
+    }
+
+    @Test
+    public void testIsValueAllExpiredReturnsFalseWhenTooFewSnapshots() throws 
Exception {
+        Path tablePath = tablePath("value_expired_few_snapshots");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        // Only 1 snapshot before cutoff
+        write(snapshotTable, "20250201", "v1");
+        write(snapshotTable, "20250315", "v2");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // cutoff = 2025-03-31 - 30d = 2025-03-01
+        // Only S(0201) before cutoff → < 2, nothing can expire
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
false);
+        LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+        BinaryRow partition0201 = findPartition(snapshotTable, "20250201");
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(partition0201), 
now))
+                .isFalse();
+    }
+
+    @Test
+    public void testIsValueAllExpiredReturnsFalseForDeltaOnlyGroup() throws 
Exception {
+        Path tablePath = tablePath("value_expired_delta_only");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(deltaTable, "20250101", "d1");
+        write(deltaTable, "20250201", "d2");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // cutoff = 2025-03-31 - 30d = 2025-03-01
+        // No snapshot boundary exists, so delta-only partitions are retained.
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
false);
+        LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+        BinaryRow deltaPartition = findPartition(deltaTable, "20250101");
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(deltaPartition), 
now))
+                .isFalse();
+    }
+
+    @Test
+    public void testIsValueAllExpiredWithGroupPartitions() throws Exception {
+        Path tablePath = tablePath("value_expired_groups");
+        createChainTable(tablePath, true);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        // Group "US": 3 snapshots, anchor = S(US,0201)
+        writeGrouped(snapshotTable, "US", "20250101", "v1");
+        writeGrouped(snapshotTable, "US", "20250201", "v2");
+        writeGrouped(snapshotTable, "US", "20250301", "v3");
+
+        // Group "EU": only 1 snapshot before cutoff → nothing expires
+        writeGrouped(snapshotTable, "EU", "20250215", "v4");
+        writeGrouped(snapshotTable, "EU", "20250320", "v5");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // cutoff = 2025-03-31 - 30d = 2025-03-01
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
true);
+        LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+        BinaryRow usExpired = findGroupedPartition(snapshotTable, "US", 
"20250101");
+        BinaryRow usAnchor = findGroupedPartition(snapshotTable, "US", 
"20250201");
+        BinaryRow euRetained = findGroupedPartition(snapshotTable, "EU", 
"20250215");
+
+        // US expired partition → truly expired
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(usExpired), 
now)).isTrue();
+
+        // US anchor → retained
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(usAnchor), 
now)).isFalse();
+
+        // EU partition (< 2 snapshots before cutoff) → retained
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(euRetained), 
now)).isFalse();
+
+        // Mix across groups: US expired + EU retained
+        assertThat(expire.isValueAllExpired(Arrays.asList(usExpired, 
euRetained), now)).isFalse();
+    }
+
+    @Test
+    public void testIsValueAllExpiredReturnsFalseForPartitionsAfterCutoff() 
throws Exception {
+        Path tablePath = tablePath("value_expired_after_cutoff");
+        createChainTable(tablePath, false);
+        FileStoreTable mainTable = loadTable(tablePath);
+        FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+        FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+        write(snapshotTable, "20250101", "v1");
+        write(snapshotTable, "20250315", "v2");
+
+        snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+        deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+        // cutoff = 2025-03-31 - 30d = 2025-03-01
+        // S(0315) is after cutoff → not expired at all
+        ChainTablePartitionExpire expire =
+                newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30), 
false);
+        LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+        BinaryRow afterCutoff = findPartition(snapshotTable, "20250315");
+        
assertThat(expire.isValueAllExpired(Collections.singletonList(afterCutoff), 
now)).isFalse();
+    }
+
+    // ========== Helper methods ==========
+
+    private BinaryRow findPartition(FileStoreTable table, String dtValue) {
+        return table.newSnapshotReader().partitionEntries().stream()
+                .map(PartitionEntry::partition)
+                .filter(p -> p.getString(0).toString().equals(dtValue))
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("Partition " + dtValue 
+ " not found"));
+    }
+
+    private BinaryRow findGroupedPartition(FileStoreTable table, String 
region, String dt) {
+        return table.newSnapshotReader().partitionEntries().stream()
+                .map(PartitionEntry::partition)
+                .filter(
+                        p ->
+                                p.getString(0).toString().equals(region)
+                                        && 
p.getString(1).toString().equals(dt))
+                .findFirst()
+                .orElseThrow(
+                        () ->
+                                new RuntimeException(
+                                        "Partition " + region + "|" + dt + " 
not found"));
+    }
+
+    private Path tablePath(String tableName) {
+        return new Path(tempDir.toUri().toString(), tableName);
+    }
+
+    private void createChainTable(Path tablePath, boolean withGroupPartition) 
throws Exception {
+        LocalFileIO fileIO = LocalFileIO.create();
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BUCKET.key(), "1");
+        options.put("merge-engine", "deduplicate");
+        options.put("sequence.field", "v");
+
+        Schema schema;
+        if (withGroupPartition) {
+            schema =
+                    new Schema(
+                            RowType.of(
+                                            new 
org.apache.paimon.types.DataType[] {
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING()
+                                            },
+                                            new String[] {"region", "dt", 
"pk", "v"})
+                                    .getFields(),
+                            Arrays.asList("region", "dt"),
+                            Arrays.asList("pk", "region", "dt"),
+                            options,
+                            "");
+        } else {
+            schema =
+                    new Schema(
+                            RowType.of(
+                                            new 
org.apache.paimon.types.DataType[] {
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING(),
+                                                DataTypes.STRING()
+                                            },
+                                            new String[] {"dt", "pk", "v"})
+                                    .getFields(),
+                            Collections.singletonList("dt"),
+                            Arrays.asList("pk", "dt"),
+                            options,
+                            "");
+        }
+        schemaManager.createTable(schema);
+
+        FileStoreTable mainTable = loadTable(tablePath);
+        mainTable.createBranch("snapshot");
+        mainTable.createBranch("delta");
+
+        List<SchemaChange> chainTableOptions =
+                Arrays.asList(
+                        SchemaChange.setOption("chain-table.enabled", "true"),
+                        
SchemaChange.setOption("scan.fallback-snapshot-branch", "snapshot"),
+                        SchemaChange.setOption("scan.fallback-delta-branch", 
"delta"),
+                        SchemaChange.setOption("partition.timestamp-pattern", 
"$dt"),
+                        
SchemaChange.setOption("partition.timestamp-formatter", "yyyyMMdd"));
+        if (withGroupPartition) {
+            chainTableOptions = new java.util.ArrayList<>(chainTableOptions);
+            
chainTableOptions.add(SchemaChange.setOption("chain-table.chain-partition-keys",
 "dt"));
+        }
+        schemaManager.commitChanges(chainTableOptions);
+        new SchemaManager(fileIO, tablePath, 
"snapshot").commitChanges(chainTableOptions);
+        new SchemaManager(fileIO, tablePath, 
"delta").commitChanges(chainTableOptions);
+    }
+
+    private FileStoreTable loadTable(Path tablePath) {
+        LocalFileIO fileIO = LocalFileIO.create();
+        Options options = new Options();
+        options.set(CoreOptions.PATH, tablePath.toString());
+        String branchName = CoreOptions.branch(options.toMap());
+        TableSchema tableSchema = new SchemaManager(fileIO, tablePath, 
branchName).latest().get();
+        return FileStoreTableFactory.create(
+                fileIO, tablePath, tableSchema, CatalogEnvironment.empty());
+    }
+
+    private void write(FileStoreTable table, String dt, String v) throws 
Exception {
+        StreamTableWrite write =
+                
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"))
+                        .newWrite(commitUser);
+        write.write(
+                GenericRow.of(
+                        BinaryString.fromString(dt),
+                        BinaryString.fromString(v),
+                        BinaryString.fromString(v)));
+        TableCommitImpl commit = table.newCommit(commitUser);
+        List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+        commit.commit(0, commitMessages);
+        write.close();
+        commit.close();
+    }
+
+    private void writeGrouped(FileStoreTable table, String region, String dt, 
String v)
+            throws Exception {
+        StreamTableWrite write =
+                
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"))
+                        .newWrite(commitUser);
+        write.write(
+                GenericRow.of(
+                        BinaryString.fromString(region),
+                        BinaryString.fromString(dt),
+                        BinaryString.fromString(v),
+                        BinaryString.fromString(v)));
+        TableCommitImpl commit = table.newCommit(commitUser);
+        List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+        commit.commit(0, commitMessages);
+        write.close();
+        commit.close();
+    }
+
+    private List<String> listPartitions(FileStoreTable table) {
+        return table.newSnapshotReader().partitionEntries().stream()
+                .map(PartitionEntry::partition)
+                .map(p -> p.getString(0).toString())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    private List<String> listGroupedPartitions(FileStoreTable table) {
+        return table.newSnapshotReader().partitionEntries().stream()
+                .map(PartitionEntry::partition)
+                .map(p -> p.getString(0).toString() + "|" + 
p.getString(1).toString())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    private Map<String, String> buildOptions(Duration expirationTime, boolean 
withGroupPartition) {
+        Map<String, String> opts = new HashMap<>();
+        opts.put("partition.timestamp-pattern", "$dt");
+        opts.put("partition.timestamp-formatter", "yyyyMMdd");
+        opts.put("scan.fallback-snapshot-branch", "snapshot");
+        opts.put("scan.fallback-delta-branch", "delta");
+        opts.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), 
expirationTime.toDays() + " d");
+        if (withGroupPartition) {
+            opts.put("chain-table.chain-partition-keys", "dt");
+        }
+        return opts;
+    }
+
+    private ChainTablePartitionExpire newChainExpire(
+            FileStoreTable snapshotTable,
+            FileStoreTable deltaTable,
+            Duration expirationTime,
+            boolean withGroupPartition) {
+        return new ChainTablePartitionExpire(
+                expirationTime,
+                Duration.ZERO,
+                snapshotTable,
+                deltaTable,
+                CoreOptions.fromMap(buildOptions(expirationTime, 
withGroupPartition)),
+                snapshotTable.schema().logicalPartitionType(),
+                false,
+                Integer.MAX_VALUE,
+                0,
+                null,
+                null);
+    }
+
+    private static class RecordingPartitionModification implements 
PartitionModification {
+
+        private final List<Map<String, String>> droppedPartitions = new 
ArrayList<>();
+
+        @Override
+        public void createPartitions(List<Map<String, String>> partitions)
+                throws Catalog.TableNotExistException {}
+
+        @Override
+        public void dropPartitions(List<Map<String, String>> partitions)
+                throws Catalog.TableNotExistException {
+            for (Map<String, String> partition : partitions) {
+                droppedPartitions.add(new HashMap<>(partition));
+            }
+        }
+
+        @Override
+        public void alterPartitions(List<PartitionStatistics> partitions)
+                throws Catalog.TableNotExistException {}
+
+        @Override
+        public void close() throws Exception {}
+
+        private List<String> droppedValues(String key) {
+            return droppedPartitions.stream()
+                    .map(partition -> partition.get(key))
+                    .collect(Collectors.toList());
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 6af293bea1..e6abe773eb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -86,7 +86,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link PartitionExpire}. */
+/** Test for {@link NormalPartitionExpire}. */
 public class PartitionExpireTest {
 
     @TempDir java.nio.file.Path tempDir;
@@ -189,7 +189,7 @@ public class PartitionExpireTest {
         write("20230103", "31");
         write("20230103", "32");
         write("20230105", "51");
-        PartitionExpire expire = newExpire();
+        NormalPartitionExpire expire = newExpire();
         expire.setLastCheck(date(1));
         Assertions.assertDoesNotThrow(() -> expire.expire(date(8), 
Long.MAX_VALUE));
         assertThat(read()).containsExactlyInAnyOrder("abcd:12");
@@ -215,7 +215,7 @@ public class PartitionExpireTest {
         write("20230103", "31");
         write("20230103", "32");
         write("20230105", "51");
-        PartitionExpire expire = newExpire();
+        NormalPartitionExpire expire = newExpire();
         expire.setLastCheck(date(1));
         Assertions.assertDoesNotThrow(() -> expire.expire(date(8), 
Long.MAX_VALUE));
 
@@ -246,7 +246,7 @@ public class PartitionExpireTest {
         write("20230103", "32");
         write("20230105", "51");
 
-        PartitionExpire expire = newExpire();
+        NormalPartitionExpire expire = newExpire();
         expire.setLastCheck(date(1));
         Assertions.assertDoesNotThrow(() -> expire.expire(date(6), 
Long.MAX_VALUE));
 
@@ -272,7 +272,7 @@ public class PartitionExpireTest {
         write("20230103", "32");
         write("20230105", "51");
 
-        PartitionExpire expire = newExpire();
+        NormalPartitionExpire expire = newExpire();
         expire.setLastCheck(date(1));
 
         expire.expire(date(3), Long.MAX_VALUE);
@@ -319,7 +319,7 @@ public class PartitionExpireTest {
         doneAction.markDone("f0=20230103");
         doneAction.markDone("f0=20230108");
 
-        PartitionExpire expire = newExpire();
+        NormalPartitionExpire expire = newExpire();
         expire.setLastCheck(date(1));
         expire.expire(date(8), Long.MAX_VALUE);
 
@@ -423,7 +423,7 @@ public class PartitionExpireTest {
         List<CommitMessage> commitMessages = write("20230101", "11");
         write("20230105", "51");
 
-        PartitionExpire expire = newExpire();
+        NormalPartitionExpire expire = newExpire();
         expire.setLastCheck(date(1));
         expire.expire(date(5), Long.MAX_VALUE);
         assertThat(read()).containsExactlyInAnyOrder("20230105:51");
@@ -471,9 +471,9 @@ public class PartitionExpireTest {
         return commitMessages;
     }
 
-    private PartitionExpire newExpire() {
+    private NormalPartitionExpire newExpire() {
         FileStoreTable table = newExpireTable();
-        return table.store().newPartitionExpire("", table);
+        return (NormalPartitionExpire) table.store().newPartitionExpire("", 
table);
     }
 
     private FileStoreTable newExpireTable() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index ff4cb93426..7efb65d2d6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.FileStore;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.TimeUtils;
 
 import java.time.Duration;
@@ -64,6 +65,7 @@ public class ExpirePartitionsAction extends TableActionBase 
implements LocalActi
     public void executeLocally() throws Exception {
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileStore<?> fileStore = fileStoreTable.store();
+
         PartitionExpire partitionExpire =
                 fileStore.newPartitionExpire(
                         "",
@@ -76,7 +78,9 @@ public class ExpirePartitionsAction extends TableActionBase 
implements LocalActi
                                 catalogLoader(),
                                 new Identifier(
                                         identifier.getDatabaseName(), 
identifier.getTableName())));
-
+        Preconditions.checkNotNull(
+                partitionExpire,
+                "Both the partition expiration time and partition field can 
not be null.");
         partitionExpire.expire(Long.MAX_VALUE);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 8a6528bd02..ecddcf11e1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -90,11 +90,9 @@ public class ExpirePartitionsProcedure extends ProcedureBase 
{
         FileStore fileStore = fileStoreTable.store();
 
         PartitionExpire partitionExpire = fileStore.newPartitionExpire("", 
fileStoreTable);
-
         Preconditions.checkNotNull(
                 partitionExpire,
                 "Both the partition expiration time and partition field can 
not be null.");
-
         List<Map<String, String>> expired = 
partitionExpire.expire(Long.MAX_VALUE);
         return expired == null || expired.isEmpty()
                 ? new Row[] {Row.of("No expired partitions.")}


Reply via email to