TheR1sing3un opened a new pull request, #7759:
URL: https://github.com/apache/paimon/pull/7759

   ## Purpose
   
   Fix a silent data-quality bug on master: a primary-key table that gets 
multiple `write_arrow` calls inside a single `prepare_commit` returns multiple 
rows per PK on read -- regardless of merge engine, including the default 
`deduplicate`. Verified empirically on `origin/master` (b4e54ada3) and on the 
fix branch:
   
   ```python
   w.write_arrow([{'id': 1, 'a': 'first',  'b': 'old'}])
   w.write_arrow([{'id': 1, 'a': 'second', 'b': 'new'}])
   c.commit(w.prepare_commit())
   
   # master:    2 rows (PK uniqueness violated, both rows returned)
   # this PR:   1 row {'id': 1, 'a': 'second', 'b': 'new'} (dedupe to latest)
   ```
   
   This is the writer-side gap #7745's `expectedFailure` cases exposed.
   
   ## Root cause
   
   * `KeyValueDataWriter._merge_data` did `concat + sort` only -- never applied 
the table's merge function -- so a flushed file held multiple rows per PK, 
violating the Java LSM invariant "PK unique within a file".
   * The read-side `raw_convertible` fast path (`split_generator.py:99-100`) 
treats any single-file PK split as merge-free and skips `SortMergeReader`. That 
assumption holds on Java because the writer enforces the invariant; on Python 
it didn't.
   
   ## Fix
   
   Mirror Java `MergeTreeWriter.flushWriteBuffer` + 
`SortBufferWriteBuffer.MergeIterator.advanceIfNeeded` 
(`paimon-core/.../mergetree/SortBufferWriteBuffer.java:163-293`): before each 
flush, fold each run of equal-PK rows through the table's merge function 
(`reset` + `add` + `get_result`). The flushed file therefore satisfies the LSM 
invariant the read side relies on.
   
   The same dispatch is now shared between the read path 
(`MergeFileSplitRead._build_merge_function`) and the writer 
(`FileStoreWrite._build_pk_merge_function`) -- single source of truth, no 
implementation drift possible.
   
   ## In scope
   
   - `pypaimon/common/merge_engine_dispatch.py` (new): single 
`build_merge_function` entry point + `partial_update_unsupported_options` 
helper, lifted verbatim from `MergeFileSplitRead`.
   - `pypaimon/read/reader/deduplicate_merge_function.py` (new): extracted from 
the inline class at the end of `sort_merge_reader.py` so the writer can reuse 
it.
   - `pypaimon/write/writer/key_value_data_writer.py`: constructor takes 
``merge_function``; new ``_merge_pending_by_pk`` runs the per-PK fold; 
``prepare_commit`` and ``_check_and_roll_if_needed`` invoke it before flushing.
   - `pypaimon/write/file_store_write.py::_build_pk_merge_function`: picks the 
merge function via the shared dispatch. Wholly unsupported engines 
(``aggregation`` / ``first-row``) fall back to ``DeduplicateMergeFunction`` so 
the file still maintains the LSM invariant; the read side still raises 
explicitly. ``partial-update`` with out-of-scope options keeps the explicit 
raise from #7745. ``with_write_type`` (column-subset writes) on PK tables is 
rejected with a clear ``NotImplementedError`` rather than crashing on a hidden 
arity mismatch.
   - `pypaimon/tests/test_write_merge_buffer.py` (new): 9 unit cases driving 
``_merge_pending_by_pk`` directly with synthetic ``pa.Table`` inputs.
   - `pypaimon/tests/test_partial_update_e2e.py`: drops the two 
``expectedFailure`` decorators (now passing); adds 
``test_deduplicate_two_write_arrows_single_commit`` regression for the master 
silent bug; adjusts unsupported-option cases to expect the error inside the 
first ``write_arrow`` call.
   - 
`pypaimon/tests/reader_primary_key_test.py::test_pk_multi_write_once_commit`: 
drops a ``# TODO support pk merge`` comment and tightens the expected table -- 
``user_id=2``'s two writes now correctly dedupe to one row.
   
   ## Out of scope
   
   - Spillable / disk-backed write buffer (Java's 
``BinaryExternalSortBuffer``). Python keeps ``pa.Table`` as the buffer; 
large-buffer OOM mitigation is a separate effort.
   - ``aggregation`` / ``first-row`` merge engine on the write side (need 
``AggregateMergeFunction`` / ``FirstRowMergeFunction`` ports first, tracked 
separately). Those engines fall back to dedupe on flush so files stay valid; 
reads still raise explicitly.
   - Partial-update sequence-group, per-field aggregator overrides, 
``ignore-delete`` and ``partial-update.remove-record-on-*`` -- still explicitly 
rejected by the shared dispatch from #7745.
   - ``DELETE`` / ``UPDATE_BEFORE`` row kinds -- Python writers always emit 
``_VALUE_KIND = 0`` (INSERT) today; merge functions raise on non-INSERT input 
as before.
   - ``with_write_type`` (column-subset writes) on PK tables -- explicitly 
rejected. The buffer layout would carry only the subset on the value side, 
while the merge function is built for the full table arity. Supporting this 
requires either filling absent columns with nulls before flush or adapting the 
merge function's arity, both larger than this PR.
   - ``AppendOnlyDataWriter`` / ``DataBlobWriter`` -- no PK, no merge needed.
   
   ## Tests
   
   From ``paimon-python/``:
   
   ```
   pytest pypaimon/tests/test_write_merge_buffer.py \
          pypaimon/tests/test_partial_update_e2e.py \
          pypaimon/tests/test_partial_update_merge_function.py -v
   # All passing (9 new unit + 18 e2e + 16 merge-function unit)
   
   pytest pypaimon/tests/reader_primary_key_test.py \
          pypaimon/tests/reader_split_generator_test.py -q
   # All passing except 2 lance tests that fail on master too (no module 
'lance')
   
   flake8 --config=dev/cfg.ini <touched files>     # clean
   ```
   
   Master vs fix verification (both engines):
   
   ```
   # master:        dedupe → 2 rows; partial-update → 2 rows (raw)
   # this branch:   dedupe → 1 row latest; partial-update → 1 row per-field 
merged
   ```
   
   ## Anti-divergence checklist
   
   - ``KeyValueDataWriter._merge_pending_by_pk`` runs ``reset`` / ``add`` / 
``get_result`` once per equal-PK run, equivalent to Java 
``SortBufferWriteBuffer.MergeIterator.advanceIfNeeded``.
   - Writer-side and reader-side dispatch share a single 
``build_merge_function`` -- impossible to drift.
   - Files flushed are PK-unique internally (LSM invariant), so the read-side 
``raw_convertible`` fast path's assumption holds.
   - ``get_result()`` returning ``None`` drops that PK group (mirrors Java ``do 
{ ... } while (result == null)``).
   - ``_check_and_roll_if_needed`` folds *before* slicing for size, so each 
sliced file individually maintains PK uniqueness.
   
   ## Known trade-off
   
   ``_check_and_roll_if_needed`` runs the per-PK fold on every ``write`` call 
(Python's buffer is a single ``pa.Table`` and we re-fold it before any 
size-based split). Java's ``SortBufferWriteBuffer`` only folds on flush. For 
workloads with many small batches this is O(n²) on buffer size. The fold is 
idempotent so correctness is fine; if it shows up in profiles, the buffer can 
be moved to an append-only batch list with a lazy fold at flush in a follow-up.
   
   ## Generative AI disclosure
   
   Drafted with assistance from a generative AI tool. All code, tests, and Java 
alignment were reviewed and validated by the contributor.
   
   ---
   
   Sister PR to #7745. Built on top of #7745's branch; once #7745 merges, this 
rebases cleanly onto master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to