JingsongLi commented on PR #8028:
URL: https://github.com/apache/paimon/pull/8028#issuecomment-4589819938
## MERGE INTO Topology
```
┌─────────────────────────┐
│ merge_into() entry │
│ _prepare(): validate & │
│ normalize parameters │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Pin base_snapshot_id │
│ All branches read the │
│ same snapshot for │
│ consistency │
└────────────┬────────────┘
│
┌─────────────────┴─────────────────┐
│ │
┌───────────▼───────────┐ ┌───────────▼───────────┐
│ MATCHED branch │ │ NOT MATCHED branch │
│ (update path) │ │ (insert path) │
└───────────┬───────────┘ └───────────┬───────────┘
│ │
┌────────────────▼────────────────┐ ┌────────────▼────────────────┐
│ read_paimon(target, projection)│ │ read_paimon(target, on_cols)│
│ Read only _ROW_ID + needed cols│ │ Read only join-key cols │
│ rename cols → t.<col> │ │ rename cols → t.<col> │
└────────────────┬────────────────┘ └────────────┬────────────────┘
│ │
┌────────────────▼────────────────┐ ┌────────────▼────────────────┐
│ source_ds rename → s.<col> │ │ source_ds rename → s.<col> │
└────────────────┬────────────────┘ └────────────┬────────────────┘
│ │
┌────────────────▼────────────────┐ ┌────────────▼────────────────┐
│ INNER JOIN │ │ LEFT ANTI JOIN │
│ on: t.<key> = s.<key> │ │ on: s.<key> = t.<key> │
│ num_partitions = N │ │ num_partitions = N │
│ → rows that matched │ │ → source rows with no match│
└────────────────┬────────────────┘ └────────────┬────────────────┘
│ │
┌────────────────▼────────────────┐ ┌────────────▼────────────────┐
│ map_batches(matched_transform) │ │ map_batches(insert_transform)│
│ Build output per SET spec │ │ Build output per SET spec │
│ out: [_ROW_ID, update_cols...] │ │ out: [all target columns] │
│ → update_ds │ │ blob cols filled with null │
└────────────────┬────────────────┘ │ → insert_ds │
│ └────────────┬────────────────┘
┌────────────────▼────────────────┐ │
│ distributed_update_apply() │ │
│ │ │
│ 1. Build planner, get FilesInfo │ │
│ (first_row_ids index) │ │
│ 2. ray.put(FilesInfo) broadcast │ │
│ 3. map_batches(_assign_frid) │ │
│ searchsorted to assign each │ │
│ row to its owning data file │ │
│ 4. groupby(_FIRST_ROW_ID) │ │
│ .map_groups(_apply_group) │ │
│ One TableUpdateByRowId per │ │
│ group → produces CommitMsgs │ │
│ 5. iter_batches to collect │ │
│ pickled CommitMessages │ │
│ → update_msgs, num_updated │ │
└────────────────┬────────────────┘ │
│ ┌────────────▼────────────────┐
│ │ distributed_write_collect │
│ │ _CollectingDatasink │
│ │ (subclass of PaimonDatasink│
│ │ write_datasink() to write │
│ │ → insert_msgs │
│ └────────────┬────────────────┘
│ │
└─────────────┬─────────────────────┘
│
┌─────────────▼─────────────────────┐
│ Merge update_msgs + insert_msgs │
│ Single atomic commit │
│ → produces exactly 1 snapshot │
└─────────────┬─────────────────────┘
│
┌─────────────▼─────────────────────┐
│ Return metrics: │
│ {num_matched, num_inserted, │
│ num_unchanged} │
└───────────────────────────────────┘
```
### Key Design Decisions
1. **Two independent JOINs instead of one LEFT OUTER JOIN** — Following
Spark's approach: a unified `left_outer` join would require `materialize()` to
feed both branches, which can OOM on large merges. Instead, the matched path
uses `inner join` and the not-matched path uses `left_anti join`, each running
independently.
2. **Distributed grouping for the update path** — The most complex part:
- `_assign_frid`: uses `np.searchsorted` to map each row's `_ROW_ID` to
the `first_row_id` of its owning data file
- `groupby(_FIRST_ROW_ID).map_groups`: groups by data file, each group
spawns a `TableUpdateByRowId` worker for local updates
- `FilesInfo` is broadcast via `ray.put()` to all workers, avoiding
per-task manifest re-scans
3. **Snapshot pinning** — All reads are pinned to `base_snapshot_id`,
preventing concurrent commits from causing the matched/not-matched branches to
see different versions of the data.
4. **Atomic commit** — `CommitMessage`s from both update and insert are
merged into a single `commit()` call, guaranteeing exactly one snapshot is
produced.
5. **Duplicate match detection** — `_apply_group` checks
`count_distinct(_ROW_ID) != num_rows`; if multiple source rows match the same
target row, it raises an error rather than silently picking a winner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]