hbgstc123 opened a new pull request, #7595:
URL: https://github.com/apache/paimon/pull/7595

   ### Purpose
   ## Problem
   
     Sort Compact reads data from snapshot S_read, sorts it, then commits as 
OVERWRITE. However, `tryOverwritePartition` builds the DELETE list from 
`latestSnapshot` (which may be
     S_read+N). If concurrent writes add new files between S_read and 
latestSnapshot, those files are deleted without their data being included in 
the sorted output, causing **silent
     data loss**.
   
     ## Root Cause
   
     Timeline:
     1. SortCompact reads from snapshot S0 (files A, B, C)
     2. Concurrent writer adds file D -> S1
     3. SortCompact sorts A+B+C -> produces X, Y
     4. SortCompact calls overwritePartition(new files X, Y)
     5. Commit builds DELETE list from latestSnapshot = S1 (contains A, B, C, D)
     6. Commit issues: DELETE A, B, C, D + ADD X, Y
     7. File D is deleted but X, Y don't contain D's data -> DATA LOSS
   
     ## Solution
   
     ### Part 1: Pin DELETE list to base snapshot
   
     Build the DELETE list from the snapshot the sort compact actually read 
from (base snapshot) instead of latestSnapshot. This ensures we only delete 
files that were included in the
     sorted output.
   
     ### Part 2: Concurrent write detection
   
     When the base snapshot differs from the latest snapshot, use 
`readIncrementalChanges(baseSnapshot, latestSnapshot)` to detect new files 
added to the overwritten partitions. If new
      files exist, fail the commit with a clear conflict error instead of 
silently losing data.
   
     ## Changes
   
     ### Core
   
     - **`FileStoreCommit.java`**: Add `@Nullable Long baseSnapshotId` 
parameter to `overwritePartition()`
     - **`FileStoreCommitImpl.java`**:
       - Modify `tryOverwritePartition()` to build DELETE list from 
`baseSnapshot`
       - Add concurrent write detection that fails fast if new files were added
     - **`InnerTableCommit.java`**: Add `withOverwriteBaseSnapshot(@Nullable 
Long snapshotId)` method
     - **`TableCommitImpl.java`**: Implement `withOverwriteBaseSnapshot()` and 
pass `baseSnapshotId` to commit
   
     ### Flink
   
     - **`SortCompactAction.java`**: Capture read snapshot ID before building 
source, pass to sink builder
     - **`FlinkSinkBuilder.java`**: Add `overwriteBaseSnapshotId` field and 
setter
     - **`FlinkWriteSink.java`**: Add `overwriteBaseSnapshotId` field and 
setter, pass to committer
   
     ### Spark
   
     - **`CompactProcedure.java`**: Capture read snapshot ID for sort compact 
operations
     - **`PaimonSparkWriter.scala`**: Add `withOverwriteBaseSnapshot()` method
   
     ## Backward Compatibility
   
     When `baseSnapshotId == null`, `tryOverwritePartition` falls back to 
current behavior (`latestSnapshot`), so existing code paths (SQL INSERT 
OVERWRITE, DROP PARTITION, TRUNCATE,
     etc.) are **unaffected**.
   
     The concurrent write detection only activates when:
     1. `baseSnapshotId != null` (explicitly set by sort compact)
     2. There are actual new files added between base and latest snapshot
   
   ## Data Flow After Fix
   
     1. SortCompact reads snapshot S0 (files A,B,C), records S0.id
     2. Concurrent writer adds file D -> S1
     3. SortCompact sorts A+B+C -> produce X,Y
     4. SortCompact calls overwritePartition(files X,Y, baseSnapshotId=S0)
     5. Commit reads S0 (base snapshot), issues: DELETE A,B,C + ADD X,Y
     6. Commit reads S1 (latest snapshot), detects D was added between S0 and S1
     7. Commit FAILS with clear error: "concurrent write detected"
     8. User retries sort compact safely (D will be included in next read)
   
   ### Tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to