JingsongLi commented on PR #8028:
URL: https://github.com/apache/paimon/pull/8028#issuecomment-4570899119

   API Design Recommendations for Paimon Ray merge_into from **AI**:
   
     Based on the design patterns from Delta Lake, Lance, Iceberg/Ray, and 
Paimon Spark's MergeIntoPaimonDataEvolutionTable:
   ```
     ---
     1. Use string expressions for predicates, not Python callables
   
     Delta/Lance both use SQL-like strings that can be analyzed, optimized, and 
pushed down:
   
     # Delta
     dt.merge(source, predicate="target.id = source.id")
       .when_matched_update(updates={"name": "source.name"}, 
predicate="source.ts > target.ts")
   
     # Lance
     dataset.merge_insert("id").when_matched_update_all(condition="source.ts > 
target.ts")
   
     PR #8028 uses Python lambdas — these cannot be inspected for optimization, 
cannot participate in predicate pushdown, and are fragile for serialization 
across Ray workers:
   
     # PR #8028 (problematic)
     when_matched_update_condition=lambda r: r['s.age'] > r['t.age']
   
     Recommendation: Use string expressions or a simple expression DSL. If full 
SQL parsing is too heavy, at minimum support column-reference strings like 
"s.col" for SET values (which the PR already does) and simple comparison 
expressions for conditions.
   
     ---
     2. Align with Paimon Spark's execution model
   
     Paimon Spark's MergeIntoPaimonDataEvolutionTable uses:
   
     UPDATE path:  Target LEFT_OUTER JOIN Source → MergeRows → 
repartition(_FIRST_ROW_ID) → writePartialFields
     INSERT path:  Source LEFT_ANTI JOIN Target → MergeRows → write (full rows)
   
     The Ray implementation should mirror this:
   
     # UPDATE: inner join → extract (_ROW_ID, changed_cols) → partition by 
_FIRST_ROW_ID → partial write
     # INSERT: left_anti join → apply insert expressions → append write
     # COMMIT: atomic (update_msgs + insert_msgs), with snapshot conflict 
detection
   
     ---
     3. Recommended API shape
   
     from pypaimon.ray import merge_into, WhenMatched, WhenNotMatched
   
     merge_into(
         target="db.table",
         source=ray_dataset,  # or pa.Table, pd.DataFrame, str (table 
identifier)
         catalog_options={...},
         on=["id"],  # or {"target_col": "source_col"} for renamed keys
         when_matched=[
             WhenMatched(update="*"),                          # update all 
cols from source
             WhenMatched(update={"name": "s.name"}, condition="s.ts > t.ts"),  
# conditional
         ],
         when_not_matched=[
             WhenNotMatched(insert="*"),                       # insert all cols
             WhenNotMatched(insert={"id": "s.id", "status": "'new'"}, 
condition="s.age > 18"),
         ],
     )
   
     Key differences from PR #8028:
     - condition is a string expression, not a callable
     - Drop merge_condition (non-standard semantics that routes unmatched rows 
to INSERT — confusing and diverges from SQL MERGE)
     - Return a metrics dict: {"num_matched": ..., "num_inserted": ..., 
"num_unchanged": ...}
   
     ---
     4. Do not invent custom commit conflict detection
   
     PR #8028 adds commit.strict-mode.last-safe-snapshot to 
file_store_commit.py — a custom mechanism that modifies the core commit path 
for all table operations.
   
     Paimon already has conflict detection via 
writer.rowIdCheckConflict(snapshotId) (used in the Spark implementation). The 
Ray connector should reuse the same mechanism:
   
     # Capture snapshot before read
     plan = table.newSnapshotReader().read()
     # ... do merge work ...
     # Conflict check at commit time (existing Paimon mechanism)
     writer.rowIdCheckConflict(plan.snapshotId())
     writer.commit(update_msgs + insert_msgs)
   
     ---
     5. Follow Iceberg/Ray's two-phase separation
   
     Iceberg's Ray connector cleanly separates:
     - Phase 1 (distributed workers): write files, return metadata
     - Phase 2 (driver): collect metadata, resolve conflicts, atomic commit
   
     For Paimon:
     - Phase 1 (distributed):
       - UPDATE: inner_join → map_batches(apply_clauses) → 
groupby(_FIRST_ROW_ID).map_groups(partial_write) → return commit messages
       - INSERT: left_anti_join → map_batches(apply_clauses) → write_datasink → 
return commit messages
     - Phase 2 (driver): collect all commit messages → single atomic commit 
with conflict check
   
     ---
     6. Avoid driver-side materialization of large datasets
   
     PR #8028 collects target keys or matched indices into driver-side Python 
dicts/sets — this will OOM on large tables.
   
     Better alternatives (from Iceberg/Ray):
     - Use ray.put(keys_table) to broadcast key sets via object store (supports 
spill-to-disk)
     - Use Ray Data join() for distributed matching instead of collecting to 
driver
     - Use coarse range filters to prune unrelated target files before reading
   
     ---
     7. Minimum viable scope for a first PR
   
     Given the complexity, a reasonable first PR should:
     1. Support only when_matched_update("*") + when_not_matched_insert("*") 
(no conditions, no partial SET)
     2. Use Ray Data join() for both paths (inner + left_anti)
     3. Reuse existing TableUpdateByRowId for the update path
     4. Reuse existing write_paimon for the insert path
     5. Atomic commit with existing Paimon snapshot conflict detection
     6. No merge_condition, no self-merge optimization, no vectorized fast paths
   
     Then iterate with follow-up PRs for conditions, partial SET, 
optimizations, etc.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to