This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6850329895 [core] chain table support special partition expire. (#7643)
6850329895 is described below
commit 685032989510e55f2c34a29031f8b5c82d20804d
Author: Wenchao Wu <[email protected]>
AuthorDate: Thu May 28 10:17:16 2026 +0800
[core] chain table support special partition expire. (#7643)
This PR implements partition expiration for chain tables. Chain tables
store data across snapshot
and delta branches, where delta partitions depend on their nearest
earlier snapshot partition as an
anchor for merge-on-read. Standard partition expiration cannot be
applied directly because dropping
a snapshot partition without considering its dependent deltas would
break the chain integrity.
---
docs/docs/primary-key-table/chain-table.md | 64 ++
.../java/org/apache/paimon/AbstractFileStore.java | 72 +-
.../metastore/ChainTableCommitPreCallback.java | 98 ++-
.../operation/ChainTablePartitionExpire.java | 446 ++++++++++++
...itionExpire.java => NormalPartitionExpire.java} | 11 +-
.../apache/paimon/operation/PartitionExpire.java | 222 +-----
.../org/apache/paimon/schema/SchemaValidation.java | 8 +
.../operation/ChainTablePartitionExpireTest.java | 786 +++++++++++++++++++++
.../paimon/operation/PartitionExpireTest.java | 18 +-
.../flink/action/ExpirePartitionsAction.java | 6 +-
.../flink/procedure/ExpirePartitionsProcedure.java | 2 -
11 files changed, 1495 insertions(+), 238 deletions(-)
diff --git a/docs/docs/primary-key-table/chain-table.md
b/docs/docs/primary-key-table/chain-table.md
index b5890d428b..b061efd36a 100644
--- a/docs/docs/primary-key-table/chain-table.md
+++ b/docs/docs/primary-key-table/chain-table.md
@@ -204,3 +204,67 @@ partition keys:
This treats `(dt, hour)` as the composite chain dimension and everything
before it (e.g., `region`) as
the group dimension.
+
+## Partition Expiration
+
+Chain tables support automatic partition expiration via the standard
`partition.expiration-time` option.
+However, the expiration algorithm differs from normal tables to preserve chain
integrity.
+
+### How It Works
+
+In a normal table, every partition older than the cutoff (`now -
partition.expiration-time`) is dropped
+independently. Chain tables cannot do this because a delta partition depends
on its nearest earlier
+snapshot partition as an anchor for merge-on-read. Dropping the anchor would
break the chain.
+
+Chain table expiration works in **segments**. A segment consists of one
snapshot partition and all the
+delta partitions whose time falls between that snapshot and the next snapshot
in sorted order. The
+segment is the atomic unit of expiration: either the entire segment is
expired, or nothing in it is.
+
+The algorithm per group:
+1. List all snapshot branch partitions sorted by chain partition time.
+2. Filter to those before the cutoff (`now - partition.expiration-time`).
+3. If fewer than 2 snapshots are before the cutoff, nothing can be expired —
the only one must be kept
+ as the anchor.
+4. The most recent snapshot before the cutoff is the **anchor** (kept). All
earlier snapshots and their
+ associated delta partitions form expirable segments.
+5. Delta partitions are dropped before snapshot partitions so that the commit
pre-check always passes.
+
+For tables with group partitions, each group is processed independently. A
group with many expired
+snapshots can have segments expired while another group with only one snapshot
before the cutoff retains
+all of its data.
+
+### Example
+
+```sql
+ALTER TABLE default.t SET TBLPROPERTIES (
+ 'partition.expiration-time' = '30 d',
+ 'partition.expiration-check-interval' = '1 d'
+);
+ALTER TABLE `default`.`t$branch_snapshot` SET TBLPROPERTIES (
+ 'partition.expiration-time' = '30 d',
+ 'partition.expiration-check-interval' = '1 d'
+);
+ALTER TABLE `default`.`t$branch_delta` SET TBLPROPERTIES (
+ 'partition.expiration-time' = '30 d',
+ 'partition.expiration-check-interval' = '1 d'
+);
+```
+
+Suppose the snapshot branch has partitions `S(0101)`, `S(0201)`, `S(0301)` and
the delta branch has
+`D(0110)`, `D(0210)`, `D(0315)`. On `2025-03-31` with a 30-day retention the
cutoff is `2025-03-01`:
+
+- Snapshots before cutoff: `S(0101)`, `S(0201)`. Anchor = `S(0201)` (kept).
+- Segment 1 expired: `S(0101)` + `D(0110)` (delta between `S(0101)` and
`S(0201)`).
+- Remaining: `S(0201)`, `S(0301)`, `D(0210)`, `D(0315)`.
+
+### Important Notes
+
+- **Delta-only groups are not expired.** If a group has delta partitions but
no snapshot partition, its
+ deltas are the only copy of that group's data. Partition expiration will not
touch them. They will
+ start to be expired once at least two snapshot partitions exist for the
group and fall before the
+ cutoff.
+- **Conflict detection is anchor-aware.** When `partition.expiration-strategy`
is `values-time`, the
+ conflict detection during writes correctly recognizes that anchor partitions
are retained and does not
+ reject writes to them.
+- The `partition.expiration-time` and `partition.expiration-check-interval`
options should be set
+ consistently across the main table and both branches.
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 9aad0259cd..cb3bae1dbe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -19,6 +19,7 @@
package org.apache.paimon;
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.catalog.TableRollback;
@@ -38,16 +39,19 @@ import
org.apache.paimon.metastore.ChainTableCommitPreCallback;
import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.metastore.VisibilityWaitCallback;
+import org.apache.paimon.operation.ChainTablePartitionExpire;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.operation.NormalPartitionExpire;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.operation.commit.CommitRollback;
import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.partition.PartitionExpireStrategy;
+import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
@@ -65,6 +69,7 @@ import org.apache.paimon.tag.SuccessFileTagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainTableUtils;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
@@ -440,6 +445,10 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
return null;
}
+ if (options.isChainTable()) {
+ return newChainTablePartitionExpire(table);
+ }
+
return newPartitionExpire(
commitUser,
table,
@@ -459,12 +468,19 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
Duration expirationTime,
Duration checkInterval,
PartitionExpireStrategy expireStrategy) {
+ if (options.isChainTable()) {
+ checkArgument(
+ expireStrategy instanceof
PartitionValuesTimeExpireStrategy,
+ "Chain table only supports 'values-time' partition
expiration strategy.");
+ return newChainTablePartitionExpire(table, expirationTime,
checkInterval);
+ }
+
PartitionModification partitionModification = null;
if (options.partitionedTableInMetastore()) {
partitionModification = catalogEnvironment.partitionModification();
}
- return new PartitionExpire(
+ return new NormalPartitionExpire(
expirationTime,
checkInterval,
expireStrategy,
@@ -476,6 +492,60 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
options.partitionExpireBatchSize());
}
+ @Nullable
+ private ChainTablePartitionExpire
newChainTablePartitionExpire(FileStoreTable table) {
+ Duration partitionExpireTime = options.partitionExpireTime();
+ if (partitionExpireTime == null) {
+ return null;
+ }
+ return newChainTablePartitionExpire(
+ table, partitionExpireTime,
options.partitionExpireCheckInterval());
+ }
+
+ @Nullable
+ private ChainTablePartitionExpire newChainTablePartitionExpire(
+ FileStoreTable table, Duration expirationTime, Duration
checkInterval) {
+ if (partitionType().getFieldCount() == 0) {
+ return null;
+ }
+ FileStoreTable primaryTable =
ChainTableUtils.resolveChainPrimaryTable(table);
+ FileStoreTable snapshotTable =
+
primaryTable.switchToBranch(options.scanFallbackSnapshotBranch());
+ FileStoreTable deltaTable =
primaryTable.switchToBranch(options.scanFallbackDeltaBranch());
+ return new ChainTablePartitionExpire(
+ expirationTime,
+ checkInterval,
+ snapshotTable,
+ deltaTable,
+ options,
+ partitionType(),
+ options.endInputCheckPartitionExpire(),
+ options.partitionExpireMaxNum(),
+ options.partitionExpireBatchSize(),
+
newPartitionModificationForBranch(options.scanFallbackSnapshotBranch()),
+
newPartitionModificationForBranch(options.scanFallbackDeltaBranch()));
+ }
+
+ @Nullable
+ private PartitionModification newPartitionModificationForBranch(String
branchName) {
+ if (!options.partitionedTableInMetastore()) {
+ return null;
+ }
+
+ Identifier identifier = catalogEnvironment.identifier();
+ if (identifier == null) {
+ return catalogEnvironment.partitionModification();
+ }
+
+ Identifier branchIdentifier =
+ new Identifier(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ branchName,
+ identifier.getSystemTableName());
+ return
catalogEnvironment.copy(branchIdentifier).partitionModification();
+ }
+
@Override
public TagAutoManager newTagAutoManager(FileStoreTable table) {
return TagAutoManager.create(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
index 0b26cb6375..03399a4fde 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
@@ -36,6 +36,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitPreCallback;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainPartitionProjector;
import org.apache.paimon.utils.ChainTableUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -43,6 +44,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -111,33 +113,59 @@ public class ChainTableCommitPreCallback implements
CommitPreCallback {
partitionType,
table.schema().partitionKeys().toArray(new String[0]),
coreOptions.legacyPartitionName());
- RecordComparator partitionComparator =
-
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+
+ List<String> chainKeys =
+ ChainTableUtils.chainPartitionKeys(coreOptions,
table.schema().partitionKeys());
+ int chainFieldCount = chainKeys.size();
+ ChainPartitionProjector projector =
+ new ChainPartitionProjector(partitionType, chainFieldCount);
+ int groupFieldCount = projector.groupFieldCount();
+ RecordComparator chainComparator =
+
CodeGenUtils.newRecordComparator(projector.chainPartitionType().getFieldTypes());
+
List<BinaryRow> snapshotPartitions =
table.newSnapshotReader().partitionEntries().stream()
.map(PartitionEntry::partition)
- .sorted(partitionComparator)
.collect(Collectors.toList());
SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader();
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (BinaryRow partition : changedPartitions) {
+ BinaryRow partitionGroup =
projector.extractGroupPartition(partition);
+ BinaryRow partitionChain =
projector.extractChainPartition(partition);
+
+ List<BinaryRow> sameGroupSnapshots =
+ filterSameGroup(snapshotPartitions, partitionGroup,
projector);
+ sameGroupSnapshots.sort(
+ (a, b) ->
+ chainComparator.compare(
+ projector.extractChainPartition(a),
+ projector.extractChainPartition(b)));
+
Optional<BinaryRow> preSnapshotPartition =
- findPreSnapshotPartition(snapshotPartitions, partition,
partitionComparator);
+ findPreSnapshotInGroup(
+ sameGroupSnapshots, partitionChain,
chainComparator, projector);
Optional<BinaryRow> nextSnapshotPartition =
- findNextSnapshotPartition(snapshotPartitions, partition,
partitionComparator);
+ findNextSnapshotInGroup(
+ sameGroupSnapshots, partitionChain,
chainComparator, projector);
+
Predicate deltaFollowingPredicate =
- ChainTableUtils.createTriangularPredicate(
- partition, partitionConverter, builder::equal,
builder::greaterThan);
+ ChainTableUtils.createGroupChainPredicate(
+ partition,
+ partitionConverter,
+ groupFieldCount,
+ builder::equal,
+ builder::greaterThan);
List<BinaryRow> deltaFollowingPartitions =
deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate)
.partitionEntries().stream()
.map(PartitionEntry::partition)
.filter(
deltaPartition ->
- isBeforeNextSnapshotPartition(
+ isBeforeNextSnapshotInGroup(
deltaPartition,
nextSnapshotPartition,
- partitionComparator))
+ chainComparator,
+ projector))
.collect(Collectors.toList());
boolean canDrop =
deltaFollowingPartitions.isEmpty() ||
preSnapshotPartition.isPresent();
@@ -159,13 +187,26 @@ public class ChainTableCommitPreCallback implements
CommitPreCallback {
&& indexFiles.stream().allMatch(f -> f.kind() ==
FileKind.DELETE);
}
- private Optional<BinaryRow> findPreSnapshotPartition(
- List<BinaryRow> snapshotPartitions,
- BinaryRow partition,
- RecordComparator partitionComparator) {
+ private List<BinaryRow> filterSameGroup(
+ List<BinaryRow> partitions, BinaryRow groupKey,
ChainPartitionProjector projector) {
+ List<BinaryRow> result = new ArrayList<>();
+ for (BinaryRow partition : partitions) {
+ if (projector.extractGroupPartition(partition).equals(groupKey)) {
+ result.add(partition);
+ }
+ }
+ return result;
+ }
+
+ private Optional<BinaryRow> findPreSnapshotInGroup(
+ List<BinaryRow> sortedSameGroupPartitions,
+ BinaryRow targetChain,
+ RecordComparator chainComparator,
+ ChainPartitionProjector projector) {
BinaryRow pre = null;
- for (BinaryRow snapshotPartition : snapshotPartitions) {
- if (partitionComparator.compare(snapshotPartition, partition) < 0)
{
+ for (BinaryRow snapshotPartition : sortedSameGroupPartitions) {
+ BinaryRow chain =
projector.extractChainPartition(snapshotPartition);
+ if (chainComparator.compare(chain, targetChain) < 0) {
pre = snapshotPartition;
} else {
break;
@@ -174,24 +215,31 @@ public class ChainTableCommitPreCallback implements
CommitPreCallback {
return Optional.ofNullable(pre);
}
- private Optional<BinaryRow> findNextSnapshotPartition(
- List<BinaryRow> snapshotPartitions,
- BinaryRow partition,
- RecordComparator partitionComparator) {
- for (BinaryRow snapshotPartition : snapshotPartitions) {
- if (partitionComparator.compare(snapshotPartition, partition) > 0)
{
+ private Optional<BinaryRow> findNextSnapshotInGroup(
+ List<BinaryRow> sortedSameGroupPartitions,
+ BinaryRow targetChain,
+ RecordComparator chainComparator,
+ ChainPartitionProjector projector) {
+ for (BinaryRow snapshotPartition : sortedSameGroupPartitions) {
+ BinaryRow chain =
projector.extractChainPartition(snapshotPartition);
+ if (chainComparator.compare(chain, targetChain) > 0) {
return Optional.of(snapshotPartition);
}
}
return Optional.empty();
}
- private boolean isBeforeNextSnapshotPartition(
+ private boolean isBeforeNextSnapshotInGroup(
BinaryRow partition,
Optional<BinaryRow> nextSnapshotPartition,
- RecordComparator partitionComparator) {
- return !nextSnapshotPartition.isPresent()
- || partitionComparator.compare(partition,
nextSnapshotPartition.get()) < 0;
+ RecordComparator chainComparator,
+ ChainPartitionProjector projector) {
+ if (!nextSnapshotPartition.isPresent()) {
+ return true;
+ }
+ BinaryRow partitionChain = projector.extractChainPartition(partition);
+ BinaryRow nextChain =
projector.extractChainPartition(nextSnapshotPartition.get());
+ return chainComparator.compare(partitionChain, nextChain) < 0;
}
private String generatePartitionValues(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java
new file mode 100644
index 0000000000..0e70854363
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ChainTablePartitionExpire.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PartitionModification;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainPartitionProjector;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * Partition expiration for chain tables.
+ *
+ * <p>Chain tables store data across snapshot and delta branches. A delta
partition depends on its
+ * nearest earlier snapshot partition as an anchor for merge-on-read. This
class expires partitions
+ * in "segments" defined by consecutive snapshot partitions to maintain chain
integrity.
+ *
+ * <p>A segment consists of one snapshot partition and all delta partitions
whose time falls between
+ * that snapshot and the next snapshot in sorted order. The segment is the
atomic unit of
+ * expiration: either the entire segment (snapshot + deltas) is expired, or
nothing in it is.
+ *
+ * <p>Algorithm per group:
+ *
+ * <ol>
+ * <li>List all snapshot branch partitions sorted by chain partition time.
+ * <li>Filter to those before the cutoff ({@code now - expirationTime}).
+ * <li>If fewer than 2 snapshots are before the cutoff, nothing can be
expired (the last one must
+ * be kept as anchor).
+ * <li>The most recent snapshot before the cutoff is the anchor (kept). All
earlier snapshots form
+ * expirable segments together with their associated delta partitions.
+ * <li>The number of segments expired is limited by {@code maxExpireNum}.
+ * <li>Delta partitions are dropped first, then snapshot partitions, so that
{@code
+ * ChainTableCommitPreCallback} validation passes.
+ * </ol>
+ */
+public class ChainTablePartitionExpire implements PartitionExpire {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ChainTablePartitionExpire.class);
+
+ private final Duration expirationTime;
+ private final Duration checkInterval;
+ private final FileStoreTable snapshotTable;
+ private final FileStoreTable deltaTable;
+ private final PartitionTimeExtractor timeExtractor;
+ private final ChainPartitionProjector projector;
+ private final RecordComparator chainPartitionComparator;
+ private final InternalRowPartitionComputer partitionComputer;
+ private final List<String> partitionKeys;
+ private final List<String> chainPartitionKeys;
+ private final boolean endInputCheckPartitionExpire;
+ private final int maxExpireNum;
+ private final int expireBatchSize;
+ @Nullable private final PartitionModification
snapshotPartitionModification;
+ @Nullable private final PartitionModification deltaPartitionModification;
+ private LocalDateTime lastCheck;
+
+ public ChainTablePartitionExpire(
+ Duration expirationTime,
+ Duration checkInterval,
+ FileStoreTable snapshotTable,
+ FileStoreTable deltaTable,
+ CoreOptions options,
+ RowType partitionType,
+ boolean endInputCheckPartitionExpire,
+ int maxExpireNum,
+ int expireBatchSize,
+ @Nullable PartitionModification snapshotPartitionModification,
+ @Nullable PartitionModification deltaPartitionModification) {
+ this.expirationTime = expirationTime;
+ this.checkInterval = checkInterval;
+ this.snapshotTable = snapshotTable;
+ this.deltaTable = deltaTable;
+ this.partitionKeys = partitionType.getFieldNames();
+ this.maxExpireNum = maxExpireNum;
+ this.expireBatchSize = expireBatchSize;
+ this.snapshotPartitionModification = snapshotPartitionModification;
+ this.deltaPartitionModification = deltaPartitionModification;
+
+ List<String> allPartitionKeys = partitionType.getFieldNames();
+ this.chainPartitionKeys = ChainTableUtils.chainPartitionKeys(options,
allPartitionKeys);
+ int chainFieldCount = chainPartitionKeys.size();
+ this.projector = new ChainPartitionProjector(partitionType,
chainFieldCount);
+ this.chainPartitionComparator =
+
CodeGenUtils.newRecordComparator(projector.chainPartitionType().getFieldTypes());
+ this.timeExtractor =
+ new PartitionTimeExtractor(
+ options.partitionTimestampPattern(),
options.partitionTimestampFormatter());
+ this.partitionComputer =
+ new InternalRowPartitionComputer(
+ options.partitionDefaultName(),
+ partitionType,
+ allPartitionKeys.toArray(new String[0]),
+ options.legacyPartitionName());
+ this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
+
+ long rndSeconds = 0;
+ long checkIntervalSeconds = checkInterval.toMillis() / 1000;
+ if (checkIntervalSeconds > 0) {
+ rndSeconds =
ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
+ }
+ this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);
+ }
+
+ @Override
+ public List<Map<String, String>> expire(long commitIdentifier) {
+ return expire(LocalDateTime.now(), commitIdentifier);
+ }
+
+ @Override
+ public boolean isValueExpiration() {
+ return true;
+ }
+
+ @Override
+ public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
+ return isValueAllExpired(partitions, LocalDateTime.now());
+ }
+
+ @VisibleForTesting
+ boolean isValueAllExpired(Collection<BinaryRow> partitions, LocalDateTime
now) {
+ LocalDateTime expireDateTime = now.minus(expirationTime);
+ for (BinaryRow partition : partitions) {
+ LocalDateTime partTime = extractPartitionTime(partition);
+ if (partTime == null || !expireDateTime.isAfter(partTime)) {
+ return false;
+ }
+ }
+
+ // All partitions are time-wise before cutoff, but chain table retains
anchors
+ // (the most recent snapshot before cutoff per group) and their
segment's deltas.
+ // Compute per-group retain boundary: partitions at or after the
boundary are retained.
+ Map<BinaryRow, LocalDateTime> retainBoundary =
computeGroupRetainBoundary(expireDateTime);
+ for (BinaryRow partition : partitions) {
+ BinaryRow groupKey = projector.extractGroupPartition(partition);
+ LocalDateTime boundary = retainBoundary.get(groupKey);
+ if (boundary == null) {
+ return false;
+ }
+ LocalDateTime partTime = extractPartitionTime(partition);
+ if (partTime != null && !partTime.isBefore(boundary)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * For each group that has snapshot partitions, compute the time boundary
at or above which
+ * partitions are retained (not expired). Returns {@link
LocalDateTime#MIN} for groups where
+ * fewer than 2 snapshots fall before the cutoff (nothing can be expired).
Groups with no
+ * snapshot partitions at all (delta-only) are not included in the result;
without a snapshot
+ * anchor, their earlier deltas may still be required by later delta-only
chain reads.
+ */
+ private Map<BinaryRow, LocalDateTime>
computeGroupRetainBoundary(LocalDateTime cutoffTime) {
+ List<PartitionEntry> snapshotEntries =
snapshotTable.newSnapshotReader().partitionEntries();
+ Map<BinaryRow, List<BinaryRow>> groupedSnapshots =
groupByGroupKey(snapshotEntries);
+
+ Map<BinaryRow, LocalDateTime> boundaries = new HashMap<>();
+ for (Map.Entry<BinaryRow, List<BinaryRow>> entry :
groupedSnapshots.entrySet()) {
+ BinaryRow groupKey = entry.getKey();
+ int countBeforeCutoff = 0;
+ LocalDateTime latestBeforeCutoff = null;
+ for (BinaryRow snapshot : entry.getValue()) {
+ LocalDateTime time = extractPartitionTime(snapshot);
+ if (time != null && cutoffTime.isAfter(time)) {
+ countBeforeCutoff++;
+ if (latestBeforeCutoff == null ||
time.isAfter(latestBeforeCutoff)) {
+ latestBeforeCutoff = time;
+ }
+ }
+ }
+ if (countBeforeCutoff < 2) {
+ boundaries.put(groupKey, LocalDateTime.MIN);
+ } else {
+ boundaries.put(groupKey, latestBeforeCutoff);
+ }
+ }
+ return boundaries;
+ }
+
+ @VisibleForTesting
+ void setLastCheck(LocalDateTime time) {
+ lastCheck = time;
+ }
+
+ @VisibleForTesting
+ List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier)
{
+ if (checkInterval.isZero()
+ || now.isAfter(lastCheck.plus(checkInterval))
+ || (endInputCheckPartitionExpire && Long.MAX_VALUE ==
commitIdentifier)) {
+ List<Map<String, String>> expired =
doExpire(now.minus(expirationTime));
+ lastCheck = now;
+ return expired;
+ }
+ return null;
+ }
+
+ private List<Map<String, String>> doExpire(LocalDateTime cutoffTime) {
+ List<PartitionEntry> snapshotPartitions =
+ snapshotTable.newSnapshotReader().partitionEntries();
+ List<PartitionEntry> deltaPartitions =
deltaTable.newSnapshotReader().partitionEntries();
+
+ Map<BinaryRow, List<BinaryRow>> groupedSnapshots =
groupByGroupKey(snapshotPartitions);
+ Map<BinaryRow, List<BinaryRow>> groupedDeltas =
groupByGroupKey(deltaPartitions);
+
+ List<BinaryRow> snapshotPartitionsToExpire = new ArrayList<>();
+ List<BinaryRow> deltaPartitionsToExpire = new ArrayList<>();
+
+ for (Map.Entry<BinaryRow, List<BinaryRow>> entry :
groupedSnapshots.entrySet()) {
+ BinaryRow groupKey = entry.getKey();
+ List<BinaryRow> groupSnapshots = entry.getValue();
+
+ groupSnapshots.sort(
+ (a, b) ->
+ chainPartitionComparator.compare(
+ projector.extractChainPartition(a),
+ projector.extractChainPartition(b)));
+
+ List<BinaryRow> snapshotsBeforeCutoff = new ArrayList<>();
+ for (BinaryRow partition : groupSnapshots) {
+ LocalDateTime partTime = extractPartitionTime(partition);
+ if (partTime != null && cutoffTime.isAfter(partTime)) {
+ snapshotsBeforeCutoff.add(partition);
+ }
+ }
+
+ if (snapshotsBeforeCutoff.size() < 2) {
+ continue;
+ }
+
+ // Anchor = most recent snapshot before cutoff, kept as merge base
+ int anchorIndex = snapshotsBeforeCutoff.size() - 1;
+
+ // Expirable snapshots: all before anchor, oldest first.
+ // Each forms a segment with its associated deltas.
+ int segmentsToExpire = Math.min(anchorIndex, maxExpireNum);
+
+ List<BinaryRow> groupDeltas = groupedDeltas.get(groupKey);
+
+ for (int i = 0; i < segmentsToExpire; i++) {
+ BinaryRow segmentSnapshot = snapshotsBeforeCutoff.get(i);
+ snapshotPartitionsToExpire.add(segmentSnapshot);
+
+ if (groupDeltas != null) {
+ // Segment boundary: from this snapshot's time up to the
next snapshot's time
+ LocalDateTime segmentStart =
extractPartitionTime(segmentSnapshot);
+ BinaryRow nextSnapshot = snapshotsBeforeCutoff.get(i + 1);
+ LocalDateTime segmentEnd =
extractPartitionTime(nextSnapshot);
+
+ if (segmentStart != null && segmentEnd != null) {
+ for (BinaryRow deltaPartition : groupDeltas) {
+ LocalDateTime deltaTime =
extractPartitionTime(deltaPartition);
+ if (deltaTime != null
+ && !deltaTime.isBefore(segmentStart)
+ && deltaTime.isBefore(segmentEnd)) {
+ deltaPartitionsToExpire.add(deltaPartition);
+ }
+ }
+ }
+ }
+ }
+
+ // Also collect orphan deltas before the earliest expired snapshot
+ if (segmentsToExpire > 0 && groupDeltas != null) {
+ LocalDateTime firstSnapshotTime =
+ extractPartitionTime(snapshotsBeforeCutoff.get(0));
+ if (firstSnapshotTime != null) {
+ for (BinaryRow deltaPartition : groupDeltas) {
+ LocalDateTime deltaTime =
extractPartitionTime(deltaPartition);
+ if (deltaTime != null &&
deltaTime.isBefore(firstSnapshotTime)) {
+ deltaPartitionsToExpire.add(deltaPartition);
+ }
+ }
+ }
+ }
+ }
+
+ if (snapshotPartitionsToExpire.isEmpty() &&
deltaPartitionsToExpire.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List<Map<String, String>> deltaSpecs =
toPartitionSpecs(deltaPartitionsToExpire);
+ List<Map<String, String>> snapshotSpecs =
toPartitionSpecs(snapshotPartitionsToExpire);
+ List<Map<String, String>> allExpired = new ArrayList<>();
+
+ if (!deltaSpecs.isEmpty()) {
+ LOG.info("Chain table expire delta partitions: {}", deltaSpecs);
+ batchDropPartitions(deltaTable, deltaSpecs,
deltaPartitionModification);
+ allExpired.addAll(deltaSpecs);
+ }
+
+ if (!snapshotSpecs.isEmpty()) {
+ LOG.info("Chain table expire snapshot partitions: {}",
snapshotSpecs);
+ batchDropPartitions(snapshotTable, snapshotSpecs,
snapshotPartitionModification);
+ allExpired.addAll(snapshotSpecs);
+ }
+
+ return allExpired;
+ }
+
+ private void batchDropPartitions(
+ FileStoreTable table,
+ List<Map<String, String>> partitionSpecs,
+ @Nullable PartitionModification partitionModification) {
+ if (partitionModification != null) {
+ try {
+ if (expireBatchSize > 0 && expireBatchSize <
partitionSpecs.size()) {
+ for (List<Map<String, String>> batch :
+ Lists.partition(partitionSpecs, expireBatchSize)) {
+ partitionModification.dropPartitions(batch);
+
partitionModification.dropPartitions(toDonePartitions(batch));
+ }
+ } else {
+ partitionModification.dropPartitions(partitionSpecs);
+
partitionModification.dropPartitions(toDonePartitions(partitionSpecs));
+ }
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (expireBatchSize > 0 && expireBatchSize <
partitionSpecs.size()) {
+ for (List<Map<String, String>> batch :
+ Lists.partition(partitionSpecs, expireBatchSize)) {
+ dropPartitions(table, batch);
+ }
+ } else {
+ dropPartitions(table, partitionSpecs);
+ }
+ }
+ }
+
+ private List<Map<String, String>> toDonePartitions(
+ List<Map<String, String>> expiredPartitions) {
+ List<Map<String, String>> donePartitions = new
ArrayList<>(expiredPartitions.size());
+ for (Map<String, String> partition : expiredPartitions) {
+ LinkedHashMap<String, String> donePartition = new
LinkedHashMap<>(partition);
+ Map.Entry<String, String> lastEntry = null;
+ for (Map.Entry<String, String> entry : donePartition.entrySet()) {
+ lastEntry = entry;
+ }
+ if (lastEntry != null) {
+ donePartition.put(lastEntry.getKey(), lastEntry.getValue() +
".done");
+ donePartitions.add(donePartition);
+ }
+ }
+ return donePartitions;
+ }
+
+ private Map<BinaryRow, List<BinaryRow>>
groupByGroupKey(List<PartitionEntry> partitionEntries) {
+ Map<BinaryRow, List<BinaryRow>> grouped = new LinkedHashMap<>();
+ for (PartitionEntry entry : partitionEntries) {
+ BinaryRow fullPartition = entry.partition();
+ BinaryRow groupKey =
projector.extractGroupPartition(fullPartition);
+ grouped.computeIfAbsent(groupKey, k -> new
ArrayList<>()).add(fullPartition);
+ }
+ return grouped;
+ }
+
+ private LocalDateTime extractPartitionTime(BinaryRow partition) {
+ try {
+ LinkedHashMap<String, String> partValues =
+ partitionComputer.generatePartValues(partition);
+ List<String> chainValues = new ArrayList<>();
+ for (String key : chainPartitionKeys) {
+ chainValues.add(partValues.get(key));
+ }
+ return timeExtractor.extract(chainPartitionKeys, chainValues);
+ } catch (Exception e) {
+ LOG.warn("Failed to extract partition time from {}", partition, e);
+ return null;
+ }
+ }
+
+ private List<Map<String, String>> toPartitionSpecs(List<BinaryRow>
partitions) {
+ return partitions.stream()
+ .map(
+ p -> {
+ LinkedHashMap<String, String> values =
+ partitionComputer.generatePartValues(p);
+ Map<String, String> spec = new LinkedHashMap<>();
+ for (String key : partitionKeys) {
+ String value = values.get(key);
+ if (value != null) {
+ spec.put(key, value);
+ }
+ }
+ return spec;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private void dropPartitions(FileStoreTable table, List<Map<String,
String>> partitionSpecs) {
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(partitionSpecs);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to drop partitions from %s: %s.",
table.name(), partitionSpecs),
+ e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java
similarity index 97%
copy from
paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
copy to
paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java
index 74b7850ed7..5a23f538a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/NormalPartitionExpire.java
@@ -44,9 +44,9 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/** Expire partitions. */
-public class PartitionExpire {
+public class NormalPartitionExpire implements PartitionExpire {
- private static final Logger LOG =
LoggerFactory.getLogger(PartitionExpire.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(NormalPartitionExpire.class);
private static final String DELIMITER = ",";
@@ -61,7 +61,7 @@ public class PartitionExpire {
private final int maxExpireNum;
private final int expireBatchSize;
- public PartitionExpire(
+ public NormalPartitionExpire(
Duration expirationTime,
Duration checkInterval,
PartitionExpireStrategy strategy,
@@ -90,7 +90,7 @@ public class PartitionExpire {
this.expireBatchSize = expireBatchSize;
}
- public PartitionExpire(
+ public NormalPartitionExpire(
Duration expirationTime,
Duration checkInterval,
PartitionExpireStrategy strategy,
@@ -111,14 +111,17 @@ public class PartitionExpire {
expireBatchSize);
}
+ @Override
public List<Map<String, String>> expire(long commitIdentifier) {
return expire(LocalDateTime.now(), commitIdentifier);
}
+ @Override
public boolean isValueExpiration() {
return strategy instanceof PartitionValuesTimeExpireStrategy;
}
+ @Override
public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
PartitionValuesTimeExpireStrategy valuesStrategy =
(PartitionValuesTimeExpireStrategy) strategy;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 74b7850ed7..bb1487735b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -18,208 +18,38 @@
package org.apache.paimon.operation;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.partition.PartitionExpireStrategy;
-import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
-import org.apache.paimon.table.PartitionModification;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-
-/** Expire partitions. */
-public class PartitionExpire {
-
- private static final Logger LOG =
LoggerFactory.getLogger(PartitionExpire.class);
-
- private static final String DELIMITER = ",";
-
- private final Duration expirationTime;
- private final Duration checkInterval;
- private final FileStoreScan scan;
- private final FileStoreCommit commit;
- @Nullable private final PartitionModification partitionModification;
- private LocalDateTime lastCheck;
- private final PartitionExpireStrategy strategy;
- private final boolean endInputCheckPartitionExpire;
- private final int maxExpireNum;
- private final int expireBatchSize;
-
- public PartitionExpire(
- Duration expirationTime,
- Duration checkInterval,
- PartitionExpireStrategy strategy,
- FileStoreScan scan,
- FileStoreCommit commit,
- @Nullable PartitionModification partitionModification,
- boolean endInputCheckPartitionExpire,
- int maxExpireNum,
- int expireBatchSize) {
- this.expirationTime = expirationTime;
- this.checkInterval = checkInterval;
- this.strategy = strategy;
- this.scan = scan;
- this.commit = commit;
- this.partitionModification = partitionModification;
- // Avoid the execution time of stream jobs from being too short and
preventing partition
- // expiration
- long rndSeconds = 0;
- long checkIntervalSeconds = checkInterval.toMillis() / 1000;
- if (checkIntervalSeconds > 0) {
- rndSeconds =
ThreadLocalRandom.current().nextLong(checkIntervalSeconds);
- }
- this.lastCheck = LocalDateTime.now().minusSeconds(rndSeconds);
- this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
- this.maxExpireNum = maxExpireNum;
- this.expireBatchSize = expireBatchSize;
- }
-
- public PartitionExpire(
- Duration expirationTime,
- Duration checkInterval,
- PartitionExpireStrategy strategy,
- FileStoreScan scan,
- FileStoreCommit commit,
- @Nullable PartitionModification partitionModification,
- int maxExpireNum,
- int expireBatchSize) {
- this(
- expirationTime,
- checkInterval,
- strategy,
- scan,
- commit,
- partitionModification,
- false,
- maxExpireNum,
- expireBatchSize);
- }
-
- public List<Map<String, String>> expire(long commitIdentifier) {
- return expire(LocalDateTime.now(), commitIdentifier);
- }
-
- public boolean isValueExpiration() {
- return strategy instanceof PartitionValuesTimeExpireStrategy;
- }
- public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
- PartitionValuesTimeExpireStrategy valuesStrategy =
- (PartitionValuesTimeExpireStrategy) strategy;
- LocalDateTime expireDateTime =
LocalDateTime.now().minus(expirationTime);
- for (BinaryRow partition : partitions) {
- if (!valuesStrategy.isExpired(expireDateTime, partition)) {
- return false;
- }
- }
- return true;
- }
-
- @VisibleForTesting
- void setLastCheck(LocalDateTime time) {
- lastCheck = time;
- }
-
- @VisibleForTesting
- List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier)
{
- if (checkInterval.isZero()
- || now.isAfter(lastCheck.plus(checkInterval))
- || (endInputCheckPartitionExpire && Long.MAX_VALUE ==
commitIdentifier)) {
- List<Map<String, String>> expired =
- doExpire(now.minus(expirationTime), commitIdentifier);
- lastCheck = now;
- return expired;
- }
- return null;
- }
-
- private List<Map<String, String>> doExpire(
- LocalDateTime expireDateTime, long commitIdentifier) {
- List<PartitionEntry> partitionEntries =
- strategy.selectExpiredPartitions(scan, expireDateTime);
- List<List<String>> expiredPartValues = new
ArrayList<>(partitionEntries.size());
- for (PartitionEntry partition : partitionEntries) {
- Object[] array = strategy.convertPartition(partition.partition());
- expiredPartValues.add(strategy.toPartitionValue(array));
- }
-
- List<Map<String, String>> expired = new ArrayList<>();
- if (!expiredPartValues.isEmpty()) {
- // convert partition value to partition string, and limit the
partition num
- expired = convertToPartitionString(expiredPartValues);
- LOG.info("Expire Partitions: {}", expired);
- if (expireBatchSize > 0 && expireBatchSize < expired.size()) {
- Lists.partition(expired, expireBatchSize)
- .forEach(
- expiredBatchPartitions ->
- doBatchExpire(expiredBatchPartitions,
commitIdentifier));
- } else {
- doBatchExpire(expired, commitIdentifier);
- }
- }
- return expired;
- }
-
- private void doBatchExpire(
- List<Map<String, String>> expiredBatchPartitions, long
commitIdentifier) {
- if (partitionModification != null) {
- try {
- partitionModification.dropPartitions(expiredBatchPartitions);
- // also drop corresponding .done partitions
-
partitionModification.dropPartitions(toDonePartitions(expiredBatchPartitions));
- } catch (Catalog.TableNotExistException e) {
- throw new RuntimeException(e);
- }
- } else {
- // .done partitions only exist when partitionModification != null
- // (metastore.partitioned-table = true), so no need to handle them
here
- commit.dropPartitions(expiredBatchPartitions, commitIdentifier);
- }
- }
-
- private List<Map<String, String>> toDonePartitions(
- List<Map<String, String>> expiredPartitions) {
- List<Map<String, String>> donePartitions = new
ArrayList<>(expiredPartitions.size());
- for (Map<String, String> partition : expiredPartitions) {
- LinkedHashMap<String, String> donePartition = new
LinkedHashMap<>(partition);
- // append .done suffix to the last partition field value
- Map.Entry<String, String> lastEntry = null;
- for (Map.Entry<String, String> entry : donePartition.entrySet()) {
- lastEntry = entry;
- }
- if (lastEntry != null) {
- donePartition.put(lastEntry.getKey(), lastEntry.getValue() +
".done");
- donePartitions.add(donePartition);
- }
- }
- return donePartitions;
- }
-
- private List<Map<String, String>> convertToPartitionString(
- List<List<String>> expiredPartValues) {
- return expiredPartValues.stream()
- .map(values -> String.join(DELIMITER, values))
- .sorted()
- // Use split(DELIMITER, -1) to preserve trailing empty strings
- .map(s -> s.split(DELIMITER, -1))
- .map(strategy::toPartitionString)
- .limit(Math.min(expiredPartValues.size(), maxExpireNum))
- .collect(Collectors.toList());
- }
+/**
+ * Common interface for partition expiration strategies.
+ *
+ * <p>Implementations include {@link NormalPartitionExpire} for normal tables
and {@link
+ * ChainTablePartitionExpire} for chain tables that require segment-based
expiration across snapshot
+ * and delta branches.
+ */
+public interface PartitionExpire {
+
+ /**
+ * Expire partitions that are older than the configured expiration time.
+ *
+ * @return the list of expired partition specs, or null if the check
interval has not elapsed
+ */
+ @Nullable
+ List<Map<String, String>> expire(long commitIdentifier);
+
+ /** Whether this expiration uses values-time strategy. */
+ boolean isValueExpiration();
+
+ /**
+ * Check whether all given partitions are expired according to the
values-time strategy.
+ *
+ * <p>Only valid when {@link #isValueExpiration()} returns true.
+ */
+ boolean isValueAllExpired(Collection<BinaryRow> partitions);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3d02ede617..c230b9048c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -947,6 +947,14 @@ public class SchemaValidation {
options.partitionTimestampFormatter() != null,
"Partition timestamp formatter is required for chain
table.");
+ if (options.partitionExpireTime() != null) {
+ Preconditions.checkArgument(
+
"values-time".equals(options.partitionExpireStrategy()),
+ "Chain table only supports 'values-time' partition
expiration strategy, "
+ + "but found '%s'.",
+ options.partitionExpireStrategy());
+ }
+
// validate chain-table.chain-partition-keys
List<String> chainPartKeys =
options.chainTableChainPartitionKeys();
if (chainPartKeys != null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
new file mode 100644
index 0000000000..5ff5edbf71
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ChainTablePartitionExpireTest.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.partition.PartitionUpdateTimeExpireStrategy;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.PartitionModification;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ChainTablePartitionExpire}. */
+public class ChainTablePartitionExpireTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private String commitUser;
+
+ @BeforeEach
+ public void before() {
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @Test
+ public void testExplicitPartitionExpireRejectsUpdateTimeStrategy() throws
Exception {
+ Path tablePath = tablePath("explicit_update_time_strategy");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+
+ assertThatThrownBy(
+ () ->
+ mainTable
+ .store()
+ .newPartitionExpire(
+ commitUser,
+ mainTable,
+ Duration.ofDays(1),
+ Duration.ZERO,
+ new
PartitionUpdateTimeExpireStrategy(
+
CoreOptions.fromMap(Collections.emptyMap()),
+
mainTable.schema().logicalPartitionType())))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Chain table only supports 'values-time' partition
expiration strategy");
+ }
+
+ @Test
+ public void testExpireWithSinglePartitionKey() throws Exception {
+ Path tablePath = tablePath("simple_expire");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(snapshotTable, "20250101", "v1");
+ write(snapshotTable, "20250201", "v2");
+ write(snapshotTable, "20250301", "v3");
+
+ write(deltaTable, "20250110", "v4");
+ write(deltaTable, "20250115", "v5");
+ write(deltaTable, "20250210", "v6");
+ write(deltaTable, "20250215", "v7");
+ write(deltaTable, "20250315", "v8");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+ assertThat(listPartitions(snapshotTable))
+ .containsExactlyInAnyOrder("20250101", "20250201", "20250301");
+ assertThat(listPartitions(deltaTable))
+ .containsExactlyInAnyOrder(
+ "20250110", "20250115", "20250210", "20250215",
"20250315");
+
+ // cutoff = 2025-03-31 - 20d = 2025-03-11
+ // Snapshots before cutoff: 20250101, 20250201, 20250301 (3 snapshots)
+ // Anchor = 20250301 (kept), expire 2 segments:
+ // Segment0: S(20250101), d(20250110), d(20250115)
+ // Segment1: S(20250201), d(20250210), d(20250215)
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(20),
false);
+ expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+ List<Map<String, String>> expired =
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0),
Long.MAX_VALUE);
+
+ assertThat(expired).isNotNull();
+ assertThat(expired).isNotEmpty();
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250301");
+
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250315");
+ }
+
+ @Test
+ public void testNoExpireWhenOnlyOneSnapshotBeforeCutoff() throws Exception
{
+ Path tablePath = tablePath("no_expire_one_snapshot");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(snapshotTable, "20250201", "v1");
+ write(snapshotTable, "20250315", "v2");
+ write(deltaTable, "20250205", "v3");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
false);
+ expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+ List<Map<String, String>> expired =
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0),
Long.MAX_VALUE);
+
+ assertThat(expired).isEmpty();
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250201",
"20250315");
+
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250205");
+ }
+
+ @Test
+ public void testExpireMultipleSegments() throws Exception {
+ Path tablePath = tablePath("multi_segments");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(snapshotTable, "20250101", "v1");
+ write(snapshotTable, "20250115", "v2");
+ write(snapshotTable, "20250201", "v3");
+ write(snapshotTable, "20250315", "v4");
+
+ write(deltaTable, "20250105", "v5");
+ write(deltaTable, "20250120", "v6");
+ write(deltaTable, "20250210", "v7");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // cutoff = 2025-03-31 - 30d = 2025-03-01
+ // Snapshots before cutoff: 20250101, 20250115, 20250201 (3 snapshots)
+ // Anchor = 20250201 (kept), expire S(20250101), S(20250115)
+ // Delta before anchor: d(20250105), d(20250120)
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
false);
+ expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE);
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
assertThat(listPartitions(snapshotTable)).containsExactlyInAnyOrder("20250201",
"20250315");
+
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250210");
+ }
+
+ @Test
+ public void testNoExpireWhenNoSnapshotsBeforeCutoff() throws Exception {
+ Path tablePath = tablePath("no_expire_no_snapshot");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(snapshotTable, "20250315", "v1");
+ write(snapshotTable, "20250320", "v2");
+ write(deltaTable, "20250316", "v3");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // cutoff = 2025-03-31 - 30d = 2025-03-01
+ // No snapshots before cutoff
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
false);
+ expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+ List<Map<String, String>> expired =
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0),
Long.MAX_VALUE);
+
+ assertThat(expired).isEmpty();
+ }
+
+ @Test
+ public void testCheckIntervalPreventsExpire() throws Exception {
+ Path tablePath = tablePath("check_interval");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(snapshotTable, "20250101", "v1");
+ write(snapshotTable, "20250201", "v2");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ ChainTablePartitionExpire expire =
+ new ChainTablePartitionExpire(
+ Duration.ofDays(30),
+ Duration.ofDays(1),
+ snapshotTable,
+ deltaTable,
+ CoreOptions.fromMap(buildOptions(Duration.ofDays(30),
false)),
+ snapshotTable.schema().logicalPartitionType(),
+ false,
+ Integer.MAX_VALUE,
+ 0,
+ null,
+ null);
+ expire.setLastCheck(LocalDateTime.of(2025, 3, 31, 0, 0));
+
+ List<Map<String, String>> expired =
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0),
Long.MAX_VALUE);
+
+ assertThat(expired).isNull();
+ }
+
+ @Test
+ public void testMaxExpireNumLimitsSegments() throws Exception {
+ Path tablePath = tablePath("max_expire_num");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ // Snapshots: S(0101), S(0115), S(0201), S(0315)
+ // cutoff = 03-01, anchor = S(0201)
+ // Segments to expire: Segment1={S(0101), d(0105)}, Segment2={S(0115),
d(0120)}
+ write(snapshotTable, "20250101", "v1");
+ write(snapshotTable, "20250115", "v2");
+ write(snapshotTable, "20250201", "v3");
+ write(snapshotTable, "20250315", "v4");
+
+ write(deltaTable, "20250105", "v5");
+ write(deltaTable, "20250120", "v6");
+ write(deltaTable, "20250210", "v7");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // maxExpireNum=1 means only 1 segment: Segment1={S(0101), d(0105)}
+ ChainTablePartitionExpire expire =
+ new ChainTablePartitionExpire(
+ Duration.ofDays(30),
+ Duration.ZERO,
+ snapshotTable,
+ deltaTable,
+ CoreOptions.fromMap(buildOptions(Duration.ofDays(30),
false)),
+ snapshotTable.schema().logicalPartitionType(),
+ false,
+ 1,
+ 0,
+ null,
+ null);
+ expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+ List<Map<String, String>> expired =
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0),
Long.MAX_VALUE);
+
+ assertThat(expired).isNotNull();
+ // 1 segment = S(0101) + d(0105) = 2 partitions
+ assertThat(expired).hasSize(2);
+
+ // Verify: S(0101) expired, S(0115) still exists (not expired, was in
segment 2)
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+ assertThat(listPartitions(snapshotTable))
+ .containsExactlyInAnyOrder("20250115", "20250201", "20250315");
+ // d(0105) expired, d(0120) and d(0210) kept
+
assertThat(listPartitions(deltaTable)).containsExactlyInAnyOrder("20250120",
"20250210");
+ }
+
+ @Test
+ public void testExpireWithGroupPartition() throws Exception {
+ Path tablePath = tablePath("group_partition");
+ createChainTable(tablePath, true);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ // Group "US": snapshots 0101, 0201, 0301
+ writeGrouped(snapshotTable, "US", "20250101", "v1");
+ writeGrouped(snapshotTable, "US", "20250201", "v2");
+ writeGrouped(snapshotTable, "US", "20250301", "v3");
+ // Group "US": deltas 0110, 0210
+ writeGrouped(deltaTable, "US", "20250110", "d1");
+ writeGrouped(deltaTable, "US", "20250210", "d2");
+
+ // Group "EU": only one snapshot before cutoff, so nothing should
expire
+ writeGrouped(snapshotTable, "EU", "20250215", "v4");
+ writeGrouped(snapshotTable, "EU", "20250320", "v5");
+ writeGrouped(deltaTable, "EU", "20250220", "d3");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // cutoff = 2025-03-31 - 30d = 2025-03-01
+ // Group "US": snapshots before cutoff = [0101, 0201]. Anchor = 0201
(kept).
+ // Expire: S(0101), delta(0110) (before anchor 0201)
+ // Keep: S(0201), S(0301), delta(0210)
+ // Group "EU": snapshots before cutoff = [0215] (only 1). Nothing
expired.
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
true);
+ expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+ List<Map<String, String>> expired =
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0),
Long.MAX_VALUE);
+
+ assertThat(expired).isNotNull();
+ assertThat(expired).hasSize(2);
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ List<String> snapshotParts = listGroupedPartitions(snapshotTable);
+ List<String> deltaParts = listGroupedPartitions(deltaTable);
+
+ assertThat(snapshotParts).contains("US|20250201", "US|20250301");
+ assertThat(snapshotParts).doesNotContain("US|20250101");
+ assertThat(snapshotParts).contains("EU|20250215", "EU|20250320");
+
+ assertThat(deltaParts).contains("US|20250210");
+ assertThat(deltaParts).doesNotContain("US|20250110");
+ assertThat(deltaParts).contains("EU|20250220");
+ }
+
+ @Test
+ public void testUsesBranchSpecificPartitionModifications() throws
Exception {
+ Path tablePath = tablePath("branch_specific_partition_modification");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(snapshotTable, "20250101", "v1");
+ write(snapshotTable, "20250201", "v2");
+ write(snapshotTable, "20250301", "v3");
+ write(deltaTable, "20250110", "d1");
+ write(deltaTable, "20250210", "d2");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ RecordingPartitionModification snapshotModification = new
RecordingPartitionModification();
+ RecordingPartitionModification deltaModification = new
RecordingPartitionModification();
+
+ ChainTablePartitionExpire expire =
+ new ChainTablePartitionExpire(
+ Duration.ofDays(30),
+ Duration.ZERO,
+ snapshotTable,
+ deltaTable,
+ CoreOptions.fromMap(buildOptions(Duration.ofDays(30),
false)),
+ snapshotTable.schema().logicalPartitionType(),
+ false,
+ Integer.MAX_VALUE,
+ 0,
+ snapshotModification,
+ deltaModification);
+ expire.setLastCheck(LocalDateTime.of(2025, 1, 1, 0, 0));
+ expire.expire(LocalDateTime.of(2025, 3, 31, 0, 0), Long.MAX_VALUE);
+
+ assertThat(deltaModification.droppedValues("dt"))
+ .contains("20250110", "20250110.done")
+ .doesNotContain("20250101", "20250101.done");
+ assertThat(snapshotModification.droppedValues("dt"))
+ .contains("20250101", "20250101.done")
+ .doesNotContain("20250110", "20250110.done");
+ }
+
+ @Test
+ public void testIsValueAllExpiredReturnsFalseForAnchor() throws Exception {
+ Path tablePath = tablePath("value_expired_anchor");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ // S(0101), S(0201), S(0301)
+ write(snapshotTable, "20250101", "v1");
+ write(snapshotTable, "20250201", "v2");
+ write(snapshotTable, "20250301", "v3");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // expirationTime = 30d, "now" = 2025-03-31 → cutoff = 2025-03-01
+ // Snapshots before cutoff: S(0101), S(0201). Anchor = S(0201) (kept).
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
false);
+ LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+ BinaryRow anchor0201 = findPartition(snapshotTable, "20250201");
+ BinaryRow expired0101 = findPartition(snapshotTable, "20250101");
+
+ // Anchor partition alone → not all expired (anchor is retained)
+
assertThat(expire.isValueAllExpired(Collections.singletonList(anchor0201),
now)).isFalse();
+
+ // Truly expired partition alone → all expired
+
assertThat(expire.isValueAllExpired(Collections.singletonList(expired0101),
now)).isTrue();
+
+ // Mix of anchor + expired → not all expired
+ assertThat(expire.isValueAllExpired(Arrays.asList(expired0101,
anchor0201), now)).isFalse();
+ }
+
+ @Test
+ public void testIsValueAllExpiredReturnsFalseWhenTooFewSnapshots() throws
Exception {
+ Path tablePath = tablePath("value_expired_few_snapshots");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ // Only 1 snapshot before cutoff
+ write(snapshotTable, "20250201", "v1");
+ write(snapshotTable, "20250315", "v2");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // cutoff = 2025-03-31 - 30d = 2025-03-01
+ // Only S(0201) before cutoff → < 2, nothing can expire
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
false);
+ LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+ BinaryRow partition0201 = findPartition(snapshotTable, "20250201");
+
assertThat(expire.isValueAllExpired(Collections.singletonList(partition0201),
now))
+ .isFalse();
+ }
+
+ @Test
+ public void testIsValueAllExpiredReturnsFalseForDeltaOnlyGroup() throws
Exception {
+ Path tablePath = tablePath("value_expired_delta_only");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(deltaTable, "20250101", "d1");
+ write(deltaTable, "20250201", "d2");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // cutoff = 2025-03-31 - 30d = 2025-03-01
+ // No snapshot boundary exists, so delta-only partitions are retained.
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
false);
+ LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+ BinaryRow deltaPartition = findPartition(deltaTable, "20250101");
+
assertThat(expire.isValueAllExpired(Collections.singletonList(deltaPartition),
now))
+ .isFalse();
+ }
+
+ @Test
+ public void testIsValueAllExpiredWithGroupPartitions() throws Exception {
+ Path tablePath = tablePath("value_expired_groups");
+ createChainTable(tablePath, true);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ // Group "US": 3 snapshots, anchor = S(US,0201)
+ writeGrouped(snapshotTable, "US", "20250101", "v1");
+ writeGrouped(snapshotTable, "US", "20250201", "v2");
+ writeGrouped(snapshotTable, "US", "20250301", "v3");
+
+ // Group "EU": only 1 snapshot before cutoff → nothing expires
+ writeGrouped(snapshotTable, "EU", "20250215", "v4");
+ writeGrouped(snapshotTable, "EU", "20250320", "v5");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // cutoff = 2025-03-31 - 30d = 2025-03-01
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
true);
+ LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+ BinaryRow usExpired = findGroupedPartition(snapshotTable, "US",
"20250101");
+ BinaryRow usAnchor = findGroupedPartition(snapshotTable, "US",
"20250201");
+ BinaryRow euRetained = findGroupedPartition(snapshotTable, "EU",
"20250215");
+
+ // US expired partition → truly expired
+
assertThat(expire.isValueAllExpired(Collections.singletonList(usExpired),
now)).isTrue();
+
+ // US anchor → retained
+
assertThat(expire.isValueAllExpired(Collections.singletonList(usAnchor),
now)).isFalse();
+
+ // EU partition (< 2 snapshots before cutoff) → retained
+
assertThat(expire.isValueAllExpired(Collections.singletonList(euRetained),
now)).isFalse();
+
+ // Mix across groups: US expired + EU retained
+ assertThat(expire.isValueAllExpired(Arrays.asList(usExpired,
euRetained), now)).isFalse();
+ }
+
+ @Test
+ public void testIsValueAllExpiredReturnsFalseForPartitionsAfterCutoff()
throws Exception {
+ Path tablePath = tablePath("value_expired_after_cutoff");
+ createChainTable(tablePath, false);
+ FileStoreTable mainTable = loadTable(tablePath);
+ FileStoreTable snapshotTable = mainTable.switchToBranch("snapshot");
+ FileStoreTable deltaTable = mainTable.switchToBranch("delta");
+
+ write(snapshotTable, "20250101", "v1");
+ write(snapshotTable, "20250315", "v2");
+
+ snapshotTable = loadTable(tablePath).switchToBranch("snapshot");
+ deltaTable = loadTable(tablePath).switchToBranch("delta");
+
+ // cutoff = 2025-03-31 - 30d = 2025-03-01
+ // S(0315) is after cutoff → not expired at all
+ ChainTablePartitionExpire expire =
+ newChainExpire(snapshotTable, deltaTable, Duration.ofDays(30),
false);
+ LocalDateTime now = LocalDateTime.of(2025, 3, 31, 0, 0);
+
+ BinaryRow afterCutoff = findPartition(snapshotTable, "20250315");
+
assertThat(expire.isValueAllExpired(Collections.singletonList(afterCutoff),
now)).isFalse();
+ }
+
+ // ========== Helper methods ==========
+
+ private BinaryRow findPartition(FileStoreTable table, String dtValue) {
+ return table.newSnapshotReader().partitionEntries().stream()
+ .map(PartitionEntry::partition)
+ .filter(p -> p.getString(0).toString().equals(dtValue))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Partition " + dtValue
+ " not found"));
+ }
+
+ private BinaryRow findGroupedPartition(FileStoreTable table, String
region, String dt) {
+ return table.newSnapshotReader().partitionEntries().stream()
+ .map(PartitionEntry::partition)
+ .filter(
+ p ->
+ p.getString(0).toString().equals(region)
+ &&
p.getString(1).toString().equals(dt))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Partition " + region + "|" + dt + "
not found"));
+ }
+
+ private Path tablePath(String tableName) {
+ return new Path(tempDir.toUri().toString(), tableName);
+ }
+
+ private void createChainTable(Path tablePath, boolean withGroupPartition)
throws Exception {
+ LocalFileIO fileIO = LocalFileIO.create();
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), "1");
+ options.put("merge-engine", "deduplicate");
+ options.put("sequence.field", "v");
+
+ Schema schema;
+ if (withGroupPartition) {
+ schema =
+ new Schema(
+ RowType.of(
+ new
org.apache.paimon.types.DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"region", "dt",
"pk", "v"})
+ .getFields(),
+ Arrays.asList("region", "dt"),
+ Arrays.asList("pk", "region", "dt"),
+ options,
+ "");
+ } else {
+ schema =
+ new Schema(
+ RowType.of(
+ new
org.apache.paimon.types.DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"dt", "pk", "v"})
+ .getFields(),
+ Collections.singletonList("dt"),
+ Arrays.asList("pk", "dt"),
+ options,
+ "");
+ }
+ schemaManager.createTable(schema);
+
+ FileStoreTable mainTable = loadTable(tablePath);
+ mainTable.createBranch("snapshot");
+ mainTable.createBranch("delta");
+
+ List<SchemaChange> chainTableOptions =
+ Arrays.asList(
+ SchemaChange.setOption("chain-table.enabled", "true"),
+
SchemaChange.setOption("scan.fallback-snapshot-branch", "snapshot"),
+ SchemaChange.setOption("scan.fallback-delta-branch",
"delta"),
+ SchemaChange.setOption("partition.timestamp-pattern",
"$dt"),
+
SchemaChange.setOption("partition.timestamp-formatter", "yyyyMMdd"));
+ if (withGroupPartition) {
+ chainTableOptions = new java.util.ArrayList<>(chainTableOptions);
+
chainTableOptions.add(SchemaChange.setOption("chain-table.chain-partition-keys",
"dt"));
+ }
+ schemaManager.commitChanges(chainTableOptions);
+ new SchemaManager(fileIO, tablePath,
"snapshot").commitChanges(chainTableOptions);
+ new SchemaManager(fileIO, tablePath,
"delta").commitChanges(chainTableOptions);
+ }
+
+ private FileStoreTable loadTable(Path tablePath) {
+ LocalFileIO fileIO = LocalFileIO.create();
+ Options options = new Options();
+ options.set(CoreOptions.PATH, tablePath.toString());
+ String branchName = CoreOptions.branch(options.toMap());
+ TableSchema tableSchema = new SchemaManager(fileIO, tablePath,
branchName).latest().get();
+ return FileStoreTableFactory.create(
+ fileIO, tablePath, tableSchema, CatalogEnvironment.empty());
+ }
+
+ private void write(FileStoreTable table, String dt, String v) throws
Exception {
+ StreamTableWrite write =
+
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"))
+ .newWrite(commitUser);
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString(dt),
+ BinaryString.fromString(v),
+ BinaryString.fromString(v)));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+ commit.commit(0, commitMessages);
+ write.close();
+ commit.close();
+ }
+
+ private void writeGrouped(FileStoreTable table, String region, String dt,
String v)
+ throws Exception {
+ StreamTableWrite write =
+
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"))
+ .newWrite(commitUser);
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString(region),
+ BinaryString.fromString(dt),
+ BinaryString.fromString(v),
+ BinaryString.fromString(v)));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+ commit.commit(0, commitMessages);
+ write.close();
+ commit.close();
+ }
+
+ private List<String> listPartitions(FileStoreTable table) {
+ return table.newSnapshotReader().partitionEntries().stream()
+ .map(PartitionEntry::partition)
+ .map(p -> p.getString(0).toString())
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ private List<String> listGroupedPartitions(FileStoreTable table) {
+ return table.newSnapshotReader().partitionEntries().stream()
+ .map(PartitionEntry::partition)
+ .map(p -> p.getString(0).toString() + "|" +
p.getString(1).toString())
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ private Map<String, String> buildOptions(Duration expirationTime, boolean
withGroupPartition) {
+ Map<String, String> opts = new HashMap<>();
+ opts.put("partition.timestamp-pattern", "$dt");
+ opts.put("partition.timestamp-formatter", "yyyyMMdd");
+ opts.put("scan.fallback-snapshot-branch", "snapshot");
+ opts.put("scan.fallback-delta-branch", "delta");
+ opts.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(),
expirationTime.toDays() + " d");
+ if (withGroupPartition) {
+ opts.put("chain-table.chain-partition-keys", "dt");
+ }
+ return opts;
+ }
+
+ private ChainTablePartitionExpire newChainExpire(
+ FileStoreTable snapshotTable,
+ FileStoreTable deltaTable,
+ Duration expirationTime,
+ boolean withGroupPartition) {
+ return new ChainTablePartitionExpire(
+ expirationTime,
+ Duration.ZERO,
+ snapshotTable,
+ deltaTable,
+ CoreOptions.fromMap(buildOptions(expirationTime,
withGroupPartition)),
+ snapshotTable.schema().logicalPartitionType(),
+ false,
+ Integer.MAX_VALUE,
+ 0,
+ null,
+ null);
+ }
+
+ private static class RecordingPartitionModification implements
PartitionModification {
+
+ private final List<Map<String, String>> droppedPartitions = new
ArrayList<>();
+
+ @Override
+ public void createPartitions(List<Map<String, String>> partitions)
+ throws Catalog.TableNotExistException {}
+
+ @Override
+ public void dropPartitions(List<Map<String, String>> partitions)
+ throws Catalog.TableNotExistException {
+ for (Map<String, String> partition : partitions) {
+ droppedPartitions.add(new HashMap<>(partition));
+ }
+ }
+
+ @Override
+ public void alterPartitions(List<PartitionStatistics> partitions)
+ throws Catalog.TableNotExistException {}
+
+ @Override
+ public void close() throws Exception {}
+
+ private List<String> droppedValues(String key) {
+ return droppedPartitions.stream()
+ .map(partition -> partition.get(key))
+ .collect(Collectors.toList());
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 6af293bea1..e6abe773eb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -86,7 +86,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Test for {@link PartitionExpire}. */
+/** Test for {@link NormalPartitionExpire}. */
public class PartitionExpireTest {
@TempDir java.nio.file.Path tempDir;
@@ -189,7 +189,7 @@ public class PartitionExpireTest {
write("20230103", "31");
write("20230103", "32");
write("20230105", "51");
- PartitionExpire expire = newExpire();
+ NormalPartitionExpire expire = newExpire();
expire.setLastCheck(date(1));
Assertions.assertDoesNotThrow(() -> expire.expire(date(8),
Long.MAX_VALUE));
assertThat(read()).containsExactlyInAnyOrder("abcd:12");
@@ -215,7 +215,7 @@ public class PartitionExpireTest {
write("20230103", "31");
write("20230103", "32");
write("20230105", "51");
- PartitionExpire expire = newExpire();
+ NormalPartitionExpire expire = newExpire();
expire.setLastCheck(date(1));
Assertions.assertDoesNotThrow(() -> expire.expire(date(8),
Long.MAX_VALUE));
@@ -246,7 +246,7 @@ public class PartitionExpireTest {
write("20230103", "32");
write("20230105", "51");
- PartitionExpire expire = newExpire();
+ NormalPartitionExpire expire = newExpire();
expire.setLastCheck(date(1));
Assertions.assertDoesNotThrow(() -> expire.expire(date(6),
Long.MAX_VALUE));
@@ -272,7 +272,7 @@ public class PartitionExpireTest {
write("20230103", "32");
write("20230105", "51");
- PartitionExpire expire = newExpire();
+ NormalPartitionExpire expire = newExpire();
expire.setLastCheck(date(1));
expire.expire(date(3), Long.MAX_VALUE);
@@ -319,7 +319,7 @@ public class PartitionExpireTest {
doneAction.markDone("f0=20230103");
doneAction.markDone("f0=20230108");
- PartitionExpire expire = newExpire();
+ NormalPartitionExpire expire = newExpire();
expire.setLastCheck(date(1));
expire.expire(date(8), Long.MAX_VALUE);
@@ -423,7 +423,7 @@ public class PartitionExpireTest {
List<CommitMessage> commitMessages = write("20230101", "11");
write("20230105", "51");
- PartitionExpire expire = newExpire();
+ NormalPartitionExpire expire = newExpire();
expire.setLastCheck(date(1));
expire.expire(date(5), Long.MAX_VALUE);
assertThat(read()).containsExactlyInAnyOrder("20230105:51");
@@ -471,9 +471,9 @@ public class PartitionExpireTest {
return commitMessages;
}
- private PartitionExpire newExpire() {
+ private NormalPartitionExpire newExpire() {
FileStoreTable table = newExpireTable();
- return table.store().newPartitionExpire("", table);
+ return (NormalPartitionExpire) table.store().newPartitionExpire("",
table);
}
private FileStoreTable newExpireTable() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index ff4cb93426..7efb65d2d6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.TimeUtils;
import java.time.Duration;
@@ -64,6 +65,7 @@ public class ExpirePartitionsAction extends TableActionBase
implements LocalActi
public void executeLocally() throws Exception {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore<?> fileStore = fileStoreTable.store();
+
PartitionExpire partitionExpire =
fileStore.newPartitionExpire(
"",
@@ -76,7 +78,9 @@ public class ExpirePartitionsAction extends TableActionBase
implements LocalActi
catalogLoader(),
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName())));
-
+ Preconditions.checkNotNull(
+ partitionExpire,
+ "Both the partition expiration time and partition field can
not be null.");
partitionExpire.expire(Long.MAX_VALUE);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 8a6528bd02..ecddcf11e1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -90,11 +90,9 @@ public class ExpirePartitionsProcedure extends ProcedureBase
{
FileStore fileStore = fileStoreTable.store();
PartitionExpire partitionExpire = fileStore.newPartitionExpire("",
fileStoreTable);
-
Preconditions.checkNotNull(
partitionExpire,
"Both the partition expiration time and partition field can
not be null.");
-
List<Map<String, String>> expired =
partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new Row[] {Row.of("No expired partitions.")}