t3hw commented on PR #14797:
URL: https://github.com/apache/iceberg/pull/14797#issuecomment-4063078117

   ## Fix: cross-batch duplicate records in CDC/upsert mode
   
   ### Problem
   
   In upsert mode, when an INSERT (op=C/R) and UPDATE (op=U) for the **same 
key** arrive in the same commit batch, the writer fails to produce an equality 
delete for the prior batch's row. This causes rare but reproducible duplicate 
records that always appear in adjacent batches.
   
   ### Root cause
   
   `BaseDeltaWriter.write()` handles INSERT by calling `writer.write(row)` 
without first calling `writer.deleteKey()`. This populates `insertedRowMap` (in 
core's `BaseEqualityDeltaWriter`), which "shields" a subsequent UPDATE's 
`deleteKey()` - it finds the key in the map, takes the position-delete path, 
and **never emits the equality delete** needed to remove the row from the prior 
snapshot.
   
   Flink's `BaseDeltaTaskWriter` does not have this bug - it calls 
`deleteKey()` before `write()` for both INSERT and UPDATE_AFTER in upsert mode 
([PR #2863](https://github.com/apache/iceberg/pull/2863), [PR 
#4364](https://github.com/apache/iceberg/pull/4364)).
   
   ### Fix
   
   3-line change in `BaseDeltaWriter.java`:
   
   ```java
   case INSERT:
       if (upsert) {
           writer.deleteKey(keyProjection.wrap(row));
       }
       writer.write(row);
       break;
   ```
   
   **Trade-off:** Every INSERT in upsert mode now emits an equality delete, 
even for genuinely new keys. These no-op deletes are harmless at read time and 
resolved by compaction - the same trade-off Flink has made since 2021.
   
   ### Test coverage
   
   **Unit tests** (mock-based, `WriteResult` metadata assertions):
   - `testUpsertInsertProducesDeleteFile` - single INSERT produces equality 
delete
   - `testInsertThenUpdateSameKeyProducesEqualityDelete` - core regression test 
for the shielding bug
   - `testCrossBatchInsertIdempotency` - re-INSERT across batches produces 
equality delete
   - `testCrossBatchUpdateAfterInsert` - UPDATE across batches (already worked, 
now explicitly tested)
   - `testUpsertInsertProducesDeleteFilePartitioned` - partitioned writer 
coverage
   - Updated 12 existing test assertions to reflect new equality delete files 
from INSERTs
   
   **Table-level integration tests** (`TestCDCDeltaWriterTableLevel`):
   
   The existing mock-based tests verify `WriteResult` file counts but never 
commit to a real table or read back actual rows. To close this gap, added 
integration tests following the Flink `TestDeltaTaskWriter` pattern - write via 
`RecordUtils.createTableWriter()`, commit via `table.newRowDelta()`, read back 
via `IcebergGenerics.read(table)`, assert with `StructLikeSet`.
   
   - `testCrossBatchInsertIdempotency` - INSERT(key=1) in batch 1, 
INSERT(key=1) in batch 2, read back confirms only batch 2's row survives
   - `testCrossBatchUpdateAfterInsert` - INSERT then UPDATE across batches, 
read back confirms updated row only
   - `testCdcMixedOperationsEndToEnd` - INSERT/UPDATE/DELETE in a single batch, 
read back confirms correct final state
   - `testCrossBatchMultipleKeys` - multiple keys across batches with mixed 
operations
   - `testInsertOnlyNoDuplicates` - sanity check that distinct-key INSERTs all 
survive
   - `testCompositeKeyDistinctValues` - exercises composite key (id, id2) with 
distinct id2 values
   
   Parameterized across format versions (2, 3), file formats (Parquet, ORC), 
and partition modes (unpartitioned, partitioned). Also fixed a silently broken 
mock in the test config - `idColumns()` was returning null (Mockito default) 
and the test only passed because the schema already declared identifier field 
IDs.
   
   All tests verified with red-green cycle: cross-batch tests fail without the 
fix (reading back duplicate rows), pass with it.
   
   ### Documentation
   
   Added to `kafka-connect.md`:
   - Delete behavior matrix by table format version (V1/V2/V3+)
   - Exactly-once semantics section (three-layer guarantee: transactional 
offsets, control topic filtering, equality deletes)
   - Operational guidance (commit interval tuning, partition routing, non-null 
identifier requirement)
   
   ### Future optimization: double-buffered `insertedRowMap`
   
   The fix above produces an equality delete for every INSERT, even when the 
key was just written in the previous batch and its file path + row position are 
theoretically known. A future optimization could retain the previous batch's 
`insertedRowMap` using a double-buffer (`currentMap` + `retainedMap`, swapped 
on each commit). When `deleteKey()` finds a key in the retained map, it can 
emit a position delete / DV targeting the exact prior-batch file position - 
avoiding the equality delete entirely for adjacent-batch scenarios.
   
   This only covers 1-batch lookback (non-adjacent batches still need equality 
deletes as fallback), and requires changes to core's `BaseTaskWriter.java` 
since `insertedRowMap` is private and destroyed when 
`SinkWriter.completeWrite()` clears writers. Since this benefits all engines 
(Flink, Spark, Kafka Connect), it should be a **separate core PR** with its own 
design discussion. The fix in this PR is a prerequisite - it establishes 
correctness that the double-buffer optimizes on top of.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to