koodin9 opened a new pull request, #15710:
URL: https://github.com/apache/iceberg/pull/15710

   ## Summary
   
     After a partial commit (timeout), late-arriving `DataWritten` events from 
the previous
     commit cycle are added to the next cycle's `commitBuffer`. The current 
implementation
     groups all envelopes by table only (`tableCommitMap()`), so stale and 
current data files
     end up in the same `RowDelta`. Since all files in a single `RowDelta` 
receive the same
     sequence number, equality deletes fail to apply (`data_seq < delete_seq` 
is false),
     resulting in **duplicate rows**.
   
     This PR fixes the issue by separating envelopes by `commitId` and 
committing each group
     in a distinct `RowDelta`, ensuring stale data files get a lower sequence 
number than
     the current cycle's equality deletes.
   
     ## Root Cause
   
     1. Coordinator times out waiting for all workers → partial commit with 
available files
     2. `commitConsumerOffsets()` advances control topic offsets
     3. Late `DataWritten` from the timed-out worker arrives in the next 
`process()` call
     4. `tableCommitMap()` merges stale + current envelopes into one `RowDelta`
     5. Same sequence number → equality deletes don't apply → duplicate rows
   
     ## Changes
   
     - **`CommitState.java`**: Replace `tableCommitMap()` with 
`tableCommitMaps()` that returns
       `List<Map<TableIdentifier, List<Envelope>>>`, separated by `commitId`.
       Stale commitIds are ordered first (via `LinkedHashMap` insertion order),
       current commitId is always last.
     - **`Coordinator.java`**: `doCommit()` iterates over the list and commits 
each batch
       in a separate `RowDelta`. `offsetsJson` and `vtts` are only stored on 
the last batch
       to prevent `lastCommittedOffsetsForTable()` from filtering out 
subsequent batches.
       Null guards added to `commitToTable()`.
     - **`TestCoordinatorPartialCommit.java`**: New regression test that 
simulates the
       partial commit scenario and verifies that the stale snapshot's sequence 
number is
       strictly less than the current snapshot's sequence number.
   
     ## Why not clear the buffer? (re: #15651)
   
     An alternative approach is to discard stale events in `startNewCommit()`. 
However, this
     causes **data loss** in the partial commit scenario: after 
`doCommit(true)` succeeds,
     consumer offsets are committed, so the timed-out worker will not re-send 
its
     `DataWritten`. Clearing the buffer discards data that cannot be recovered.
   
     ## Test Plan
   
     - [x] New unit test `TestCoordinatorPartialCommit` — verifies separate 
RowDeltas
           produce strictly increasing sequence numbers
     - [x] Existing Coordinator and integration tests pass
     - [x] Production-like stress test: 4,398,000 rows with CDC (INSERT/UPDATE) 
under
           aggressive partial commit settings (`commit.interval-ms=10000`,
           `commit.timeout-ms=1000`). Verified zero data loss and no duplicate 
rows
           via Trino row count comparison against source MySQL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to