t3hw commented on PR #14797: URL: https://github.com/apache/iceberg/pull/14797#issuecomment-4063078117
## Fix: cross-batch duplicate records in CDC/upsert mode ### Problem In upsert mode, when an INSERT (op=C/R) and UPDATE (op=U) for the **same key** arrive in the same commit batch, the writer fails to produce an equality delete for the prior batch's row. This causes rare but reproducible duplicate records that always appear in adjacent batches. ### Root cause `BaseDeltaWriter.write()` handles INSERT by calling `writer.write(row)` without first calling `writer.deleteKey()`. This populates `insertedRowMap` (in core's `BaseEqualityDeltaWriter`), which "shields" a subsequent UPDATE's `deleteKey()` - it finds the key in the map, takes the position-delete path, and **never emits the equality delete** needed to remove the row from the prior snapshot. Flink's `BaseDeltaTaskWriter` does not have this bug - it calls `deleteKey()` before `write()` for both INSERT and UPDATE_AFTER in upsert mode ([PR #2863](https://github.com/apache/iceberg/pull/2863), [PR #4364](https://github.com/apache/iceberg/pull/4364)). ### Fix 3-line change in `BaseDeltaWriter.java`: ```java case INSERT: if (upsert) { writer.deleteKey(keyProjection.wrap(row)); } writer.write(row); break; ``` **Trade-off:** Every INSERT in upsert mode now emits an equality delete, even for genuinely new keys. These no-op deletes are harmless at read time and resolved by compaction - the same trade-off Flink has made since 2021. ### Test coverage **Unit tests** (mock-based, `WriteResult` metadata assertions): - `testUpsertInsertProducesDeleteFile` - single INSERT produces equality delete - `testInsertThenUpdateSameKeyProducesEqualityDelete` - core regression test for the shielding bug - `testCrossBatchInsertIdempotency` - re-INSERT across batches produces equality delete - `testCrossBatchUpdateAfterInsert` - UPDATE across batches (already worked, now explicitly tested) - `testUpsertInsertProducesDeleteFilePartitioned` - partitioned writer coverage - Updated 12 existing test assertions to reflect new equality delete files from INSERTs **Table-level integration tests** (`TestCDCDeltaWriterTableLevel`): The existing mock-based tests verify `WriteResult` file counts but never commit to a real table or read back actual rows. To close this gap, added integration tests following the Flink `TestDeltaTaskWriter` pattern - write via `RecordUtils.createTableWriter()`, commit via `table.newRowDelta()`, read back via `IcebergGenerics.read(table)`, assert with `StructLikeSet`. - `testCrossBatchInsertIdempotency` - INSERT(key=1) in batch 1, INSERT(key=1) in batch 2, read back confirms only batch 2's row survives - `testCrossBatchUpdateAfterInsert` - INSERT then UPDATE across batches, read back confirms updated row only - `testCdcMixedOperationsEndToEnd` - INSERT/UPDATE/DELETE in a single batch, read back confirms correct final state - `testCrossBatchMultipleKeys` - multiple keys across batches with mixed operations - `testInsertOnlyNoDuplicates` - sanity check that distinct-key INSERTs all survive - `testCompositeKeyDistinctValues` - exercises composite key (id, id2) with distinct id2 values Parameterized across format versions (2, 3), file formats (Parquet, ORC), and partition modes (unpartitioned, partitioned). Also fixed a silently broken mock in the test config - `idColumns()` was returning null (Mockito default) and the test only passed because the schema already declared identifier field IDs. All tests verified with red-green cycle: cross-batch tests fail without the fix (reading back duplicate rows), pass with it. ### Documentation Added to `kafka-connect.md`: - Delete behavior matrix by table format version (V1/V2/V3+) - Exactly-once semantics section (three-layer guarantee: transactional offsets, control topic filtering, equality deletes) - Operational guidance (commit interval tuning, partition routing, non-null identifier requirement) ### Future optimization: double-buffered `insertedRowMap` The fix above produces an equality delete for every INSERT, even when the key was just written in the previous batch and its file path + row position are theoretically known. A future optimization could retain the previous batch's `insertedRowMap` using a double-buffer (`currentMap` + `retainedMap`, swapped on each commit). When `deleteKey()` finds a key in the retained map, it can emit a position delete / DV targeting the exact prior-batch file position - avoiding the equality delete entirely for adjacent-batch scenarios. This only covers 1-batch lookback (non-adjacent batches still need equality deletes as fallback), and requires changes to core's `BaseTaskWriter.java` since `insertedRowMap` is private and destroyed when `SinkWriter.completeWrite()` clears writers. Since this benefits all engines (Flink, Spark, Kafka Connect), it should be a **separate core PR** with its own design discussion. The fix in this PR is a prerequisite - it establishes correctness that the double-buffer optimizes on top of. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
