hbgstc123 opened a new pull request, #7595:
URL: https://github.com/apache/paimon/pull/7595
### Purpose
## Problem
Sort Compact reads data from snapshot S_read, sorts it, then commits as
OVERWRITE. However, `tryOverwritePartition` builds the DELETE list from
`latestSnapshot` (which may be
S_read+N). If concurrent writes add new files between S_read and
latestSnapshot, those files are deleted without their data being included in
the sorted output, causing **silent
data loss**.
## Root Cause
Timeline:
1. SortCompact reads from snapshot S0 (files A, B, C)
2. Concurrent writer adds file D -> S1
3. SortCompact sorts A+B+C -> produces X, Y
4. SortCompact calls overwritePartition(new files X, Y)
5. Commit builds DELETE list from latestSnapshot = S1 (contains A, B, C, D)
6. Commit issues: DELETE A, B, C, D + ADD X, Y
7. File D is deleted but X, Y don't contain D's data -> DATA LOSS
## Solution
### Part 1: Pin DELETE list to base snapshot
Build the DELETE list from the snapshot the sort compact actually read
from (base snapshot) instead of latestSnapshot. This ensures we only delete
files that were included in the
sorted output.
### Part 2: Concurrent write detection
When the base snapshot differs from the latest snapshot, use
`readIncrementalChanges(baseSnapshot, latestSnapshot)` to detect new files
added to the overwritten partitions. If new
files exist, fail the commit with a clear conflict error instead of
silently losing data.
## Changes
### Core
- **`FileStoreCommit.java`**: Add `@Nullable Long baseSnapshotId`
parameter to `overwritePartition()`
- **`FileStoreCommitImpl.java`**:
- Modify `tryOverwritePartition()` to build DELETE list from
`baseSnapshot`
- Add concurrent write detection that fails fast if new files were added
- **`InnerTableCommit.java`**: Add `withOverwriteBaseSnapshot(@Nullable
Long snapshotId)` method
- **`TableCommitImpl.java`**: Implement `withOverwriteBaseSnapshot()` and
pass `baseSnapshotId` to commit
### Flink
- **`SortCompactAction.java`**: Capture read snapshot ID before building
source, pass to sink builder
- **`FlinkSinkBuilder.java`**: Add `overwriteBaseSnapshotId` field and
setter
- **`FlinkWriteSink.java`**: Add `overwriteBaseSnapshotId` field and
setter, pass to committer
### Spark
- **`CompactProcedure.java`**: Capture read snapshot ID for sort compact
operations
- **`PaimonSparkWriter.scala`**: Add `withOverwriteBaseSnapshot()` method
## Backward Compatibility
When `baseSnapshotId == null`, `tryOverwritePartition` falls back to
current behavior (`latestSnapshot`), so existing code paths (SQL INSERT
OVERWRITE, DROP PARTITION, TRUNCATE,
etc.) are **unaffected**.
The concurrent write detection only activates when:
1. `baseSnapshotId != null` (explicitly set by sort compact)
2. There are actual new files added between base and latest snapshot
## Data Flow After Fix
1. SortCompact reads snapshot S0 (files A,B,C), records S0.id
2. Concurrent writer adds file D -> S1
3. SortCompact sorts A+B+C -> produce X,Y
4. SortCompact calls overwritePartition(files X,Y, baseSnapshotId=S0)
5. Commit reads S0 (base snapshot), issues: DELETE A,B,C + ADD X,Y
6. Commit reads S1 (latest snapshot), detects D was added between S0 and S1
7. Commit FAILS with clear error: "concurrent write detected"
8. User retries sort compact safely (D will be included in next read)
### Tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]