JingsongLi commented on PR #8028:
URL: https://github.com/apache/paimon/pull/8028#issuecomment-4570899119
API Design Recommendations for Paimon Ray merge_into from **AI**:
Based on the design patterns from Delta Lake, Lance, Iceberg/Ray, and
Paimon Spark's MergeIntoPaimonDataEvolutionTable:
```
---
1. Use string expressions for predicates, not Python callables
Delta/Lance both use SQL-like strings that can be analyzed, optimized, and
pushed down:
# Delta
dt.merge(source, predicate="target.id = source.id")
.when_matched_update(updates={"name": "source.name"},
predicate="source.ts > target.ts")
# Lance
dataset.merge_insert("id").when_matched_update_all(condition="source.ts >
target.ts")
PR #8028 uses Python lambdas — these cannot be inspected for optimization,
cannot participate in predicate pushdown, and are fragile for serialization
across Ray workers:
# PR #8028 (problematic)
when_matched_update_condition=lambda r: r['s.age'] > r['t.age']
Recommendation: Use string expressions or a simple expression DSL. If full
SQL parsing is too heavy, at minimum support column-reference strings like
"s.col" for SET values (which the PR already does) and simple comparison
expressions for conditions.
---
2. Align with Paimon Spark's execution model
Paimon Spark's MergeIntoPaimonDataEvolutionTable uses:
UPDATE path: Target LEFT_OUTER JOIN Source → MergeRows →
repartition(_FIRST_ROW_ID) → writePartialFields
INSERT path: Source LEFT_ANTI JOIN Target → MergeRows → write (full rows)
The Ray implementation should mirror this:
# UPDATE: inner join → extract (_ROW_ID, changed_cols) → partition by
_FIRST_ROW_ID → partial write
# INSERT: left_anti join → apply insert expressions → append write
# COMMIT: atomic (update_msgs + insert_msgs), with snapshot conflict
detection
---
3. Recommended API shape
from pypaimon.ray import merge_into, WhenMatched, WhenNotMatched
merge_into(
target="db.table",
source=ray_dataset, # or pa.Table, pd.DataFrame, str (table
identifier)
catalog_options={...},
on=["id"], # or {"target_col": "source_col"} for renamed keys
when_matched=[
WhenMatched(update="*"), # update all
cols from source
WhenMatched(update={"name": "s.name"}, condition="s.ts > t.ts"),
# conditional
],
when_not_matched=[
WhenNotMatched(insert="*"), # insert all cols
WhenNotMatched(insert={"id": "s.id", "status": "'new'"},
condition="s.age > 18"),
],
)
Key differences from PR #8028:
- condition is a string expression, not a callable
- Drop merge_condition (non-standard semantics that routes unmatched rows
to INSERT — confusing and diverges from SQL MERGE)
- Return a metrics dict: {"num_matched": ..., "num_inserted": ...,
"num_unchanged": ...}
---
4. Do not invent custom commit conflict detection
PR #8028 adds commit.strict-mode.last-safe-snapshot to
file_store_commit.py — a custom mechanism that modifies the core commit path
for all table operations.
Paimon already has conflict detection via
writer.rowIdCheckConflict(snapshotId) (used in the Spark implementation). The
Ray connector should reuse the same mechanism:
# Capture snapshot before read
plan = table.newSnapshotReader().read()
# ... do merge work ...
# Conflict check at commit time (existing Paimon mechanism)
writer.rowIdCheckConflict(plan.snapshotId())
writer.commit(update_msgs + insert_msgs)
---
5. Follow Iceberg/Ray's two-phase separation
Iceberg's Ray connector cleanly separates:
- Phase 1 (distributed workers): write files, return metadata
- Phase 2 (driver): collect metadata, resolve conflicts, atomic commit
For Paimon:
- Phase 1 (distributed):
- UPDATE: inner_join → map_batches(apply_clauses) →
groupby(_FIRST_ROW_ID).map_groups(partial_write) → return commit messages
- INSERT: left_anti_join → map_batches(apply_clauses) → write_datasink →
return commit messages
- Phase 2 (driver): collect all commit messages → single atomic commit
with conflict check
---
6. Avoid driver-side materialization of large datasets
PR #8028 collects target keys or matched indices into driver-side Python
dicts/sets — this will OOM on large tables.
Better alternatives (from Iceberg/Ray):
- Use ray.put(keys_table) to broadcast key sets via object store (supports
spill-to-disk)
- Use Ray Data join() for distributed matching instead of collecting to
driver
- Use coarse range filters to prune unrelated target files before reading
---
7. Minimum viable scope for a first PR
Given the complexity, a reasonable first PR should:
1. Support only when_matched_update("*") + when_not_matched_insert("*")
(no conditions, no partial SET)
2. Use Ray Data join() for both paths (inner + left_anti)
3. Reuse existing TableUpdateByRowId for the update path
4. Reuse existing write_paimon for the insert path
5. Atomic commit with existing Paimon snapshot conflict detection
6. No merge_condition, no self-merge optimization, no vectorized fast paths
Then iterate with follow-up PRs for conditions, partial SET,
optimizations, etc.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]