JingsongLi commented on PR #8028:
URL: https://github.com/apache/paimon/pull/8028#issuecomment-4589819938

   ## MERGE INTO Topology
   
   ```
                             ┌─────────────────────────┐
                             │     merge_into() entry    │
                             │  _prepare(): validate &   │
                             │  normalize parameters     │
                             └────────────┬────────────┘
                                          │
                             ┌────────────▼────────────┐
                             │   Pin base_snapshot_id   │
                             │  All branches read the   │
                             │  same snapshot for        │
                             │  consistency              │
                             └────────────┬────────────┘
                                          │
                        ┌─────────────────┴─────────────────┐
                        │                                   │
            ┌───────────▼───────────┐           ┌───────────▼───────────┐
            │   MATCHED branch      │           │ NOT MATCHED branch    │
            │   (update path)       │           │ (insert path)         │
            └───────────┬───────────┘           └───────────┬───────────┘
                        │                                   │
       ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
       │  read_paimon(target, projection)│     │  read_paimon(target, on_cols)│
       │  Read only _ROW_ID + needed cols│     │  Read only join-key cols     │
       │  rename cols → t.<col>          │     │  rename cols → t.<col>       │
       └────────────────┬────────────────┘     └────────────┬────────────────┘
                        │                                   │
       ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
       │  source_ds rename → s.<col>     │     │  source_ds rename → s.<col>  │
       └────────────────┬────────────────┘     └────────────┬────────────────┘
                        │                                   │
       ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
       │        INNER JOIN               │     │       LEFT ANTI JOIN        │
       │  on: t.<key> = s.<key>          │     │  on: s.<key> = t.<key>      │
       │  num_partitions = N             │     │  num_partitions = N         │
       │  → rows that matched            │     │  → source rows with no match│
       └────────────────┬────────────────┘     └────────────┬────────────────┘
                        │                                   │
       ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
       │  map_batches(matched_transform) │     │ map_batches(insert_transform)│
       │  Build output per SET spec      │     │  Build output per SET spec   │
       │  out: [_ROW_ID, update_cols...] │     │  out: [all target columns]   │
       │  → update_ds                    │     │  blob cols filled with null  │
       └────────────────┬────────────────┘     │  → insert_ds                │
                        │                      └────────────┬────────────────┘
       ┌────────────────▼────────────────┐                   │
       │  distributed_update_apply()     │                   │
       │                                 │                   │
       │  1. Build planner, get FilesInfo │                   │
       │     (first_row_ids index)        │                   │
       │  2. ray.put(FilesInfo) broadcast │                   │
       │  3. map_batches(_assign_frid)    │                   │
       │     searchsorted to assign each  │                   │
       │     row to its owning data file  │                   │
       │  4. groupby(_FIRST_ROW_ID)       │                   │
       │     .map_groups(_apply_group)     │                   │
       │     One TableUpdateByRowId per    │                   │
       │     group → produces CommitMsgs  │                   │
       │  5. iter_batches to collect       │                   │
       │     pickled CommitMessages        │                   │
       │  → update_msgs, num_updated      │                   │
       └────────────────┬────────────────┘                   │
                        │                      ┌────────────▼────────────────┐
                        │                      │ distributed_write_collect   │
                        │                      │  _CollectingDatasink        │
                        │                      │  (subclass of PaimonDatasink│
                        │                      │  write_datasink() to write  │
                        │                      │  → insert_msgs              │
                        │                      └────────────┬────────────────┘
                        │                                   │
                        └─────────────┬─────────────────────┘
                                      │
                        ┌─────────────▼─────────────────────┐
                        │  Merge update_msgs + insert_msgs   │
                        │  Single atomic commit              │
                        │  → produces exactly 1 snapshot     │
                        └─────────────┬─────────────────────┘
                                      │
                        ┌─────────────▼─────────────────────┐
                        │  Return metrics:                   │
                        │  {num_matched, num_inserted,       │
                        │   num_unchanged}                   │
                        └───────────────────────────────────┘
   ```
   
   ### Key Design Decisions
   
   1. **Two independent JOINs instead of one LEFT OUTER JOIN** — Following 
Spark's approach: a unified `left_outer` join would require `materialize()` to 
feed both branches, which can OOM on large merges. Instead, the matched path 
uses `inner join` and the not-matched path uses `left_anti join`, each running 
independently.
   
   2. **Distributed grouping for the update path** — The most complex part:
      - `_assign_frid`: uses `np.searchsorted` to map each row's `_ROW_ID` to 
the `first_row_id` of its owning data file
      - `groupby(_FIRST_ROW_ID).map_groups`: groups by data file, each group 
spawns a `TableUpdateByRowId` worker for local updates
      - `FilesInfo` is broadcast via `ray.put()` to all workers, avoiding 
per-task manifest re-scans
   
   3. **Snapshot pinning** — All reads are pinned to `base_snapshot_id`, 
preventing concurrent commits from causing the matched/not-matched branches to 
see different versions of the data.
   
   4. **Atomic commit** — `CommitMessage`s from both update and insert are 
merged into a single `commit()` call, guaranteeing exactly one snapshot is 
produced.
   
   5. **Duplicate match detection** — `_apply_group` checks 
`count_distinct(_ROW_ID) != num_rows`; if multiple source rows match the same 
target row, it raises an error rather than silently picking a winner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to