t3hw opened a new pull request, #15651:
URL: https://github.com/apache/iceberg/pull/15651

   ## Fix: CommitState buffer accumulation causes duplicate records after 
failed commits
   
   ### Problem
   
   When `Coordinator.doCommit()` throws, `clearResponses()` is never called. 
Stale `DataWritten` events persist in `commitBuffer` and get committed 
alongside new events in a single `RowDelta`. Both the stale and new data files 
are added to the table, producing duplicate rows for the same key.
   
   This is workload-agnostic — it affects CDC, upsert, and append-only modes. 
The bug is in the Coordinator's `CommitState`, not in the writer layer.
   
   ### Fix (CommitState.java)
   
   1-line change — clear `commitBuffer` at the start of each new commit cycle:
   
   ```java
   void startNewCommit() {
       commitBuffer.clear();  // Discard stale events from any prior failed 
commit cycle.
       currentCommitId = UUID.randomUUID();
       startTime = System.currentTimeMillis();
   }
   ```
   
   **Why this is safe:** Workers always re-produce `DataWritten` events for 
each new `StartCommit` — source records are still in Kafka (consumer offsets 
weren't committed for the failed cycle), so workers re-read and re-process 
them. Old events in the buffer reference orphaned data files from the previous 
cycle's writers. There is no mechanism to "adopt" those files into a new commit 
cycle.
   
   **Why this approach over alternatives:**
   
   | Option | Where | Issue |
   |--------|-------|-------|
   | A: Filter by commitId in `tableCommitMap()` | Read path | Memory leak — 
stale events accumulate indefinitely under sustained failures |
   | B: Clear in `endCurrentCommit()` | End of cycle | Conflates 
success/failure cleanup; less explicit about "clean slate" intent |
   | **C: Clear in `startNewCommit()`** | **Start of cycle** | **Chosen — 
simplest, no memory leak, handles both same-JVM failure and restart/rebalance** 
|
   
   Existing `clearResponses()` call in `doCommit()` (line 173) is now redundant 
on the happy path but retained as defense-in-depth.
   
   ### Test coverage
   
   **CommitState unit tests** (`TestCommitState.java` — 2 new) — *regression 
tests for the fix:*
   
   - `testStartNewCommitClearsStaleResponses` — failed commit cycle leaves 
stale events; `startNewCommit()` must clear them
   - `testRecoveryEventsDiscardedOnNewCommit` — old events from 
restart/rebalance recovery; `startNewCommit()` must discard them
   
   **Recovery scenario tests** (`TestRecoveryScenario.java` — 2 tests) — 
*document the consequence, not the fix:*
   
   Table-level integration tests using append-only writers that prove combined 
commits produce duplicate rows. Write via `RecordUtils.createTableWriter()`, 
commit via `table.newRowDelta()`, read back via `IcebergGenerics.read(table)`, 
assert with `StructLikeSet`. Parameterized across format versions (2, 3), file 
formats (Parquet, ORC), and partition modes (unpartitioned, partitioned).
   
   - `testRecoveryScenarioInsertThenUpdateCombinedCommit` — two batches for the 
same key committed in a single RowDelta → both rows survive (duplicate)
   - `testRecoveryScenarioInsertThenReInsertCombinedCommit` — two INSERTs for 
the same key combined → both survive
   
   **Test infrastructure** (`TableLevelTestBase.java`):
   
   Abstract base class providing parameterized test configuration, writer 
creation via `RecordUtils.createTableWriter()`, commit helpers 
(`commitTransaction`, `commitCombined`), and read-back utilities 
(`actualRowSet`). Schema-agnostic — subclasses implement `schema()` and 
`sinkConfig()` to define their table schema and writer mode.
   
   **Build** (`build.gradle`):
   
   Added `testArtifacts` dependencies for `iceberg-api` and `iceberg-core` to 
support `TestBase`, `StructLikeSet`, and `IcebergGenerics` in integration tests.
   
   ### Red-green verification
   
   Commenting out `commitBuffer.clear()` in `startNewCommit()` causes 2 of 20 
tests to fail:
   
   - `testStartNewCommitClearsStaleResponses` — `tableCommitMap()` not empty 
after `startNewCommit()`
   - `testRecoveryEventsDiscardedOnNewCommit` — same assertion failure
   
   Restoring the fix brings all 20 tests back to green. The recovery scenario 
tests pass in both cases — they document the consequence (duplicates from 
combined commits), not the fix. The CommitState unit tests are the regression 
safety net.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to