TheR1sing3un opened a new pull request, #7759:
URL: https://github.com/apache/paimon/pull/7759
## Purpose
Fix a silent data-quality bug on master: a primary-key table that gets
multiple `write_arrow` calls inside a single `prepare_commit` returns multiple
rows per PK on read -- regardless of merge engine, including the default
`deduplicate`. Verified empirically on `origin/master` (b4e54ada3) and on the
fix branch:
```python
w.write_arrow([{'id': 1, 'a': 'first', 'b': 'old'}])
w.write_arrow([{'id': 1, 'a': 'second', 'b': 'new'}])
c.commit(w.prepare_commit())
# master: 2 rows (PK uniqueness violated, both rows returned)
# this PR: 1 row {'id': 1, 'a': 'second', 'b': 'new'} (dedupe to latest)
```
This is the writer-side gap #7745's `expectedFailure` cases exposed.
## Root cause
* `KeyValueDataWriter._merge_data` did `concat + sort` only -- never applied
the table's merge function -- so a flushed file held multiple rows per PK,
violating the Java LSM invariant "PK unique within a file".
* The read-side `raw_convertible` fast path (`split_generator.py:99-100`)
treats any single-file PK split as merge-free and skips `SortMergeReader`. That
assumption holds on Java because the writer enforces the invariant; on Python
it didn't.
## Fix
Mirror Java `MergeTreeWriter.flushWriteBuffer` +
`SortBufferWriteBuffer.MergeIterator.advanceIfNeeded`
(`paimon-core/.../mergetree/SortBufferWriteBuffer.java:163-293`): before each
flush, fold each run of equal-PK rows through the table's merge function
(`reset` + `add` + `get_result`). The flushed file therefore satisfies the LSM
invariant the read side relies on.
The same dispatch is now shared between the read path
(`MergeFileSplitRead._build_merge_function`) and the writer
(`FileStoreWrite._build_pk_merge_function`) -- single source of truth, no
implementation drift possible.
## In scope
- `pypaimon/common/merge_engine_dispatch.py` (new): single
`build_merge_function` entry point + `partial_update_unsupported_options`
helper, lifted verbatim from `MergeFileSplitRead`.
- `pypaimon/read/reader/deduplicate_merge_function.py` (new): extracted from
the inline class at the end of `sort_merge_reader.py` so the writer can reuse
it.
- `pypaimon/write/writer/key_value_data_writer.py`: constructor takes
``merge_function``; new ``_merge_pending_by_pk`` runs the per-PK fold;
``prepare_commit`` and ``_check_and_roll_if_needed`` invoke it before flushing.
- `pypaimon/write/file_store_write.py::_build_pk_merge_function`: picks the
merge function via the shared dispatch. Wholly unsupported engines
(``aggregation`` / ``first-row``) fall back to ``DeduplicateMergeFunction`` so
the file still maintains the LSM invariant; the read side still raises
explicitly. ``partial-update`` with out-of-scope options keeps the explicit
raise from #7745. ``with_write_type`` (column-subset writes) on PK tables is
rejected with a clear ``NotImplementedError`` rather than crashing on a hidden
arity mismatch.
- `pypaimon/tests/test_write_merge_buffer.py` (new): 9 unit cases driving
``_merge_pending_by_pk`` directly with synthetic ``pa.Table`` inputs.
- `pypaimon/tests/test_partial_update_e2e.py`: drops the two
``expectedFailure`` decorators (now passing); adds
``test_deduplicate_two_write_arrows_single_commit`` regression for the master
silent bug; adjusts unsupported-option cases to expect the error inside the
first ``write_arrow`` call.
-
`pypaimon/tests/reader_primary_key_test.py::test_pk_multi_write_once_commit`:
drops a ``# TODO support pk merge`` comment and tightens the expected table --
``user_id=2``'s two writes now correctly dedupe to one row.
## Out of scope
- Spillable / disk-backed write buffer (Java's
``BinaryExternalSortBuffer``). Python keeps ``pa.Table`` as the buffer;
large-buffer OOM mitigation is a separate effort.
- ``aggregation`` / ``first-row`` merge engine on the write side (need
``AggregateMergeFunction`` / ``FirstRowMergeFunction`` ports first, tracked
separately). Those engines fall back to dedupe on flush so files stay valid;
reads still raise explicitly.
- Partial-update sequence-group, per-field aggregator overrides,
``ignore-delete`` and ``partial-update.remove-record-on-*`` -- still explicitly
rejected by the shared dispatch from #7745.
- ``DELETE`` / ``UPDATE_BEFORE`` row kinds -- Python writers always emit
``_VALUE_KIND = 0`` (INSERT) today; merge functions raise on non-INSERT input
as before.
- ``with_write_type`` (column-subset writes) on PK tables -- explicitly
rejected. The buffer layout would carry only the subset on the value side,
while the merge function is built for the full table arity. Supporting this
requires either filling absent columns with nulls before flush or adapting the
merge function's arity, both larger than this PR.
- ``AppendOnlyDataWriter`` / ``DataBlobWriter`` -- no PK, no merge needed.
## Tests
From ``paimon-python/``:
```
pytest pypaimon/tests/test_write_merge_buffer.py \
pypaimon/tests/test_partial_update_e2e.py \
pypaimon/tests/test_partial_update_merge_function.py -v
# All passing (9 new unit + 18 e2e + 16 merge-function unit)
pytest pypaimon/tests/reader_primary_key_test.py \
pypaimon/tests/reader_split_generator_test.py -q
# All passing except 2 lance tests that fail on master too (no module
'lance')
flake8 --config=dev/cfg.ini <touched files> # clean
```
Master vs fix verification (both engines):
```
# master: dedupe → 2 rows; partial-update → 2 rows (raw)
# this branch: dedupe → 1 row latest; partial-update → 1 row per-field
merged
```
## Anti-divergence checklist
- ``KeyValueDataWriter._merge_pending_by_pk`` runs ``reset`` / ``add`` /
``get_result`` once per equal-PK run, equivalent to Java
``SortBufferWriteBuffer.MergeIterator.advanceIfNeeded``.
- Writer-side and reader-side dispatch share a single
``build_merge_function`` -- impossible to drift.
- Files flushed are PK-unique internally (LSM invariant), so the read-side
``raw_convertible`` fast path's assumption holds.
- ``get_result()`` returning ``None`` drops that PK group (mirrors Java ``do
{ ... } while (result == null)``).
- ``_check_and_roll_if_needed`` folds *before* slicing for size, so each
sliced file individually maintains PK uniqueness.
## Known trade-off
``_check_and_roll_if_needed`` runs the per-PK fold on every ``write`` call
(Python's buffer is a single ``pa.Table`` and we re-fold it before any
size-based split). Java's ``SortBufferWriteBuffer`` only folds on flush. For
workloads with many small batches this is O(n²) on buffer size. The fold is
idempotent so correctness is fine; if it shows up in profiles, the buffer can
be moved to an append-only batch list with a lazy fold at flush in a follow-up.
## Generative AI disclosure
Drafted with assistance from a generative AI tool. All code, tests, and Java
alignment were reviewed and validated by the contributor.
---
Sister PR to #7745. Built on top of #7745's branch; once #7745 merges, this
rebases cleanly onto master.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]