wzhero1 opened a new pull request, #7027:
URL: https://github.com/apache/paimon/pull/7027
### Purpose
<!-- What is the purpose of the change -->
This PR implements parallel snapshot expiration to improve the performance
of large-scale cleanup operations.
**Motivation:**
- Serial file deletion becomes a performance bottleneck for tables with
large amounts of data
- Current implementation cannot leverage Flink's distributed computing
capabilities
**Changes:**
1. **Core module refactoring** - Split the original serial expiration logic
into Planner/Executor architecture:
-
[ExpireSnapshotsPlanner](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlanner.java:57:0-280:1):
Computes expiration plan including snapshot range and four types of tasks
-
[ExpireSnapshotsExecutor](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsExecutor.java:56:0-231:1):
Executes deletion tasks based on task type
-
[ExpireSnapshotsPlan](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlan.java:39:0-217:1):
Data structure containing task lists and protection set
-
[SnapshotExpireTask](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-core/src/main/java/org/apache/paimon/operation/expire/SnapshotExpireTask.java:42:0-135:1):
Represents a single expiration task with `TaskType` enum
-
[ProtectionSet](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-core/src/main/java/org/apache/paimon/operation/expire/ProtectionSet.java:37:0-62:1):
Immutable set of protected manifests and tagged snapshots
-
[DeletionReport](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-core/src/main/java/org/apache/paimon/operation/expire/DeletionReport.java:28:0-125:1):
Execution report for each task
2. **Flink Action parallel mode** -
[ExpireSnapshotsAction](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java:66:0-263:1)
supports `--parallelism` parameter:
- **Worker Phase**: Parallel deletion of data files and changelog files
using
[RangePartitionedExpireFunction](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/RangePartitionedExpireFunction.java:53:0-109:1)
- **Sink Phase**: Serial deletion of manifests and snapshot metadata
using
[SnapshotExpireSink](cci:2://file:///Users/steven/code/gitspace/paimon/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java:59:0-173:1)
- Tasks are partitioned by snapshot ID range to maximize cache locality
**Execution modes:**
- Serial mode (default): `parallelism=null or ≤1` → Uses
`ExpireSnapshotsImpl`
- Parallel mode: `parallelism>1 + --force_start_flink_job` → Uses Flink
distributed execution
### Tests
<!-- List UT and IT cases to verify this change -->
**Unit Tests:**
- `ExpireSnapshotsPlanTest` - Tests task partitioning logic
([partitionTasksBySnapshotRange](cci:1://file:///Users/steven/code/gitspace/paimon/paimon-core/src/main/java/org/apache/paimon/operation/expire/ExpireSnapshotsPlan.java:109:4-184:5))
- `DeletionReportTest` - Tests deletion report serialization
- `ExpireSnapshotsTest` - Core expiration logic tests (refactored to use new
Planner/Executor)
**Integration Tests:**
- `ExpireSnapshotsActionITCase` - Flink parallel mode integration tests
- `ExpireSnapshotsProcedureITCase` - Procedure integration tests
### API and Format
<!-- Does this change affect API or storage format -->
**New CLI parameters for `expire-snapshots` action:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `--parallelism` | Integer | null | Parallelism for parallel mode (requires
>1) |
**No storage format changes.**
**Backward compatible:** Default behavior (serial mode) remains unchanged.
### Documentation
<!-- Does this change introduce a new feature -->
Yes, this introduces a new feature: parallel snapshot expiration.
Documentation should be added to describe:
- New `--parallelism` parameter for `expire-snapshots` action
- Requirement: parallel mode needs both `--parallelism > 1` and
`--force_start_flink_job`
- Performance recommendations for large-scale cleanup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]