This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a8699b382a [python][ray] Optimize merge into self-merge updates on 
data evolution table (#8141)
a8699b382a is described below

commit a8699b382a496fc456cb302836def78078eafb98
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Jun 7 19:51:41 2026 +0800

    [python][ray] Optimize merge into self-merge updates on data evolution 
table (#8141)
    
    When source and target are the same table in `merge_into` (self-merge),
    skip the join step and read the target table directly with
    `_ROW_ID` projection. This avoids a full table join and significantly
    improves performance for self-merge update scenarios on
      data-evolution tables.
---
 .../pypaimon/ray/data_evolution_merge_into.py      |  75 +++--
 .../pypaimon/ray/data_evolution_merge_join.py      | 192 ++++++++---
 .../tests/ray_data_evolution_merge_into_test.py    | 357 +++++++++++++++++++++
 3 files changed, 554 insertions(+), 70 deletions(-)

diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py 
b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
index 31627f9c16..ab20a56dc7 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py
@@ -26,6 +26,7 @@ import pyarrow as pa
 from pypaimon.ray.data_evolution_merge_join import (
     build_matched_update_ds,
     build_not_matched_insert_ds,
+    build_self_merge_update_ds,
     distributed_update_apply,
     distributed_write_collect_msgs,
 )
@@ -53,6 +54,7 @@ class _PrepareCtx:
     update_pa_schema: pa.Schema
     full_pa_schema: pa.Schema
     catalog_options: Dict[str, str]
+    is_self_merge: bool = False
 
 
 def merge_into(
@@ -183,33 +185,45 @@ def _prepare(target, source, catalog_options, 
when_matched, when_not_matched, on
             _NormalizedClause(spec=spec, condition=c.condition)
         )
 
-    source_snapshot_id = None
-    if isinstance(source, str):
-        source_snapshot = (
-            catalog.get_table(source)
-            .snapshot_manager()
-            .get_latest_snapshot()
+    is_self_merge = _is_self_merge(target, source, target_on_cols, 
source_on_cols)
+    if is_self_merge and not_matched_specs:
+        raise ValueError(
+            "Self-merge (source == target with ON _ROW_ID) does not "
+            "support WHEN NOT MATCHED clauses."
         )
-        if source_snapshot is not None:
-            source_snapshot_id = source_snapshot.id
 
-    source_ds = _normalize_source(
-        source, catalog_options, source_snapshot_id=source_snapshot_id,
-    )
-    _validate_source_on_cols(source_ds, source_on_cols)
+    if is_self_merge:
+        source_ds = None
+        source_col_names = set(full_target_field_names) | set(source_on_cols)
+    else:
+        source_snapshot_id = None
+        if isinstance(source, str):
+            source_snapshot = (
+                catalog.get_table(source)
+                .snapshot_manager()
+                .get_latest_snapshot()
+            )
+            if source_snapshot is not None:
+                source_snapshot_id = source_snapshot.id
+        source_ds = _normalize_source(
+            source, catalog_options, source_snapshot_id=source_snapshot_id,
+        )
+        _validate_source_on_cols(source_ds, source_on_cols)
+        source_col_names = set(_source_schema_or_raise(source_ds).names)
     _validate_source_has_target_cols(
-        source_ds, matched_specs + not_matched_specs,
+        source_col_names, matched_specs + not_matched_specs,
     )
 
     if has_condition:
         from pypaimon.ray.merge_condition import extract_columns
-        source_names = set(_source_schema_or_raise(source_ds).names)
         target_names = set(full_target_field_names)
+        if is_self_merge:
+            target_names |= set(target_on_cols)
         for c in list(when_matched) + list(when_not_matched):
             if c.condition is not None:
                 for ref in extract_columns(c.condition):
                     prefix, col = ref.split(".", 1)
-                    if prefix == "s" and col not in source_names:
+                    if prefix == "s" and col not in source_col_names:
                         raise ValueError(
                             f"condition references unknown source "
                             f"column '{col}'"
@@ -238,10 +252,20 @@ def _prepare(target, source, catalog_options, 
when_matched, when_not_matched, on
         update_pa_schema=update_pa_schema,
         full_pa_schema=full_pa_schema,
         catalog_options=catalog_options,
+        is_self_merge=is_self_merge,
     )
     return table, source_ds, matched_specs, not_matched_specs, ctx
 
 
+def _is_self_merge(target, source, target_on_cols, source_on_cols) -> bool:
+    from pypaimon.table.special_fields import SpecialFields
+    row_id_name = SpecialFields.ROW_ID.name
+    return (isinstance(source, str)
+            and source == target
+            and target_on_cols == [row_id_name]
+            and source_on_cols == [row_id_name])
+
+
 def _build_datasets(
     target, source_ds, matched_specs, not_matched_specs,
     ctx: "_PrepareCtx", base_snapshot, num_partitions, ray_remote_args,
@@ -255,6 +279,22 @@ def _build_datasets(
     insert_ds = None
     update_cols_union: List[str] = []
 
+    if ctx.is_self_merge:
+        if matched_specs and base_snapshot is not None:
+            update_cols_union = _union_update_cols(matched_specs)
+            update_ds = build_self_merge_update_ds(
+                target_identifier=target,
+                clauses=matched_specs,
+                target_field_names=ctx.full_target_field_names,
+                target_pa_schema=ctx.update_pa_schema,
+                update_cols=update_cols_union,
+                catalog_options=ctx.catalog_options,
+                resolve_target_projection=_resolve_target_projection,
+                snapshot_id=base_snapshot_id,
+                ray_remote_args=ray_remote_args,
+            )
+        return update_ds, insert_ds, update_cols_union
+
     # Mirror Spark: matched/not-matched run as two independent joins
     # (inner / left_anti). One unified left_outer join would force
     # joined.materialize() to feed both branches, which can OOM on large 
merges.
@@ -566,16 +606,15 @@ def _validate_source_on_cols(source_ds, on: 
Sequence[str]) -> None:
 
 
 def _validate_source_has_target_cols(
-    source_ds,
+    source_col_names: set,
     specs: List[_NormalizedClause],
 ) -> None:
-    names = set(_source_schema_or_raise(source_ds).names)
     needed = set()
     for clause in specs:
         for val in clause.spec.values():
             if isinstance(val, SourceColumnRef):
                 needed.add(val.column)
-    missing = sorted(needed - names)
+    missing = sorted(needed - source_col_names)
     if missing:
         raise ValueError(
             f"source is missing columns {missing} referenced by SET spec"
diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_join.py 
b/paimon-python/pypaimon/ray/data_evolution_merge_join.py
index 14088979f8..4ad91e7da1 100644
--- a/paimon-python/pypaimon/ray/data_evolution_merge_join.py
+++ b/paimon-python/pypaimon/ray/data_evolution_merge_join.py
@@ -21,6 +21,7 @@ from typing import Any, Dict, List, Optional, Sequence, Tuple
 import pyarrow as pa
 
 from pypaimon.ray.data_evolution_merge_transform import (
+    SourceColumnRef,
     _NormalizedClause,
     build_update_schema,
     vectorized_insert_transform,
@@ -40,6 +41,137 @@ def _map_kwargs(
     return kwargs
 
 
+def _build_matched_transform(
+    clauses: List[_NormalizedClause],
+    on_map: Dict[str, str],
+    on_pairs: List[Tuple[str, str]],
+    update_cols: List[str],
+    row_id_name: str,
+    update_schema: pa.Schema,
+):
+    prepared_clauses = []
+    for clause in clauses:
+        rewritten = None
+        if clause.condition is not None:
+            from pypaimon.ray.merge_condition import (
+                remap_source_on_keys, rewrite_condition,
+            )
+            rewritten = remap_source_on_keys(
+                rewrite_condition(clause.condition), on_map,
+            )
+        prepared_clauses.append((clause.spec, rewritten))
+
+    _filter_batch = None
+    if any(r is not None for _, r in prepared_clauses):
+        from pypaimon.ray.merge_condition import filter_batch as _filter_batch
+
+    def _transform(batch: pa.Table) -> pa.Table:
+        remaining = batch
+        parts = []
+        for spec, rewritten in prepared_clauses:
+            if remaining.num_rows == 0:
+                break
+            if rewritten is not None:
+                matched = _filter_batch(
+                    remaining, rewritten, _pre_rewritten=True,
+                )
+            else:
+                matched = remaining
+            if matched.num_rows == 0:
+                continue
+            parts.append(vectorized_matched_transform(
+                matched, spec, on_pairs,
+                update_cols, row_id_name,
+                update_schema,
+            ))
+            if rewritten is not None and matched.num_rows < remaining.num_rows:
+                not_cond = f"COALESCE(NOT ({rewritten}), TRUE)"
+                remaining = _filter_batch(
+                    remaining, not_cond, _pre_rewritten=True,
+                )
+            else:
+                remaining = remaining.slice(0, 0)
+        if not parts:
+            return update_schema.empty_table()
+        return pa.concat_tables(parts)
+
+    return _transform
+
+
+def build_self_merge_update_ds(
+    *,
+    target_identifier: str,
+    clauses: List[_NormalizedClause],
+    target_field_names: Sequence[str],
+    target_pa_schema: pa.Schema,
+    update_cols: Sequence[str],
+    catalog_options: Dict[str, str],
+    resolve_target_projection,
+    snapshot_id: Optional[int] = None,
+    ray_remote_args: Optional[Dict[str, Any]] = None,
+) -> Tuple:
+    from pypaimon.ray.ray_paimon import read_paimon
+    from pypaimon.table.special_fields import SpecialFields
+
+    row_id_name = SpecialFields.ROW_ID.name
+    needed_cols = set(resolve_target_projection(
+        clauses, [row_id_name], update_cols, target_field_names,
+    ))
+    for clause in clauses:
+        for value in clause.spec.values():
+            if isinstance(value, SourceColumnRef):
+                needed_cols.add(value.column)
+    target_set = set(target_field_names)
+    for clause in clauses:
+        if clause.condition is not None:
+            from pypaimon.ray.merge_condition import extract_columns
+            for ref in extract_columns(clause.condition):
+                prefix, col = ref.split(".", 1)
+                if prefix == "s" and col in target_set:
+                    needed_cols.add(col)
+    projection = [row_id_name] + [
+        c for c in target_field_names if c in needed_cols
+    ]
+
+    target_ds = read_paimon(
+        target_identifier, catalog_options,
+        projection=projection, snapshot_id=snapshot_id,
+    )
+    update_schema = build_update_schema(target_pa_schema, update_cols, 
row_id_name)
+
+    orig_names = target_ds.schema().names
+    target_renamed = target_ds.rename_columns(
+        {c: f"t.{c}" for c in orig_names}
+    )
+
+    def _add_source_aliases(batch: pa.Table) -> pa.Table:
+        columns = list(batch.columns)
+        names = list(batch.schema.names)
+        for orig in orig_names:
+            if orig == row_id_name:
+                continue
+            t_col_name = f"t.{orig}"
+            if t_col_name in names:
+                idx = names.index(t_col_name)
+                columns.append(columns[idx])
+                names.append(f"s.{orig}")
+        return pa.table(columns, names=names)
+
+    aliased = target_renamed.map_batches(
+        _add_source_aliases, **_map_kwargs(ray_remote_args),
+    )
+
+    _transform = _build_matched_transform(
+        clauses,
+        on_map={row_id_name: row_id_name},
+        on_pairs=[(row_id_name, row_id_name)],
+        update_cols=list(update_cols),
+        row_id_name=row_id_name,
+        update_schema=update_schema,
+    )
+    return aliased.map_batches(_transform, **_map_kwargs(ray_remote_args))
+
+
 def build_matched_update_ds(
     *,
     target_identifier: str,
@@ -87,58 +219,14 @@ def build_matched_update_ds(
         right_on=tuple(f"s.{c}" for c in source_on),
     )
 
-    captured_update_cols = list(update_cols)
-    captured_row_id_name = row_id_name
-    captured_on_pairs = list(zip(source_on, target_on))
-    captured_schema = update_schema
-
-    on_map = dict(zip(source_on, target_on))
-    prepared_clauses = []
-    for clause in clauses:
-        rewritten = None
-        if clause.condition is not None:
-            from pypaimon.ray.merge_condition import (
-                remap_source_on_keys, rewrite_condition,
-            )
-            rewritten = remap_source_on_keys(
-                rewrite_condition(clause.condition), on_map,
-            )
-        prepared_clauses.append((clause.spec, rewritten))
-
-    _filter_batch = None
-    if any(r is not None for _, r in prepared_clauses):
-        from pypaimon.ray.merge_condition import filter_batch as _filter_batch
-
-    def _transform(batch: pa.Table) -> pa.Table:
-        remaining = batch
-        parts = []
-        for spec, rewritten in prepared_clauses:
-            if remaining.num_rows == 0:
-                break
-            if rewritten is not None:
-                matched = _filter_batch(
-                    remaining, rewritten, _pre_rewritten=True,
-                )
-            else:
-                matched = remaining
-            if matched.num_rows == 0:
-                continue
-            parts.append(vectorized_matched_transform(
-                matched, spec, captured_on_pairs,
-                captured_update_cols, captured_row_id_name,
-                captured_schema,
-            ))
-            if rewritten is not None and matched.num_rows < remaining.num_rows:
-                not_cond = f"COALESCE(NOT ({rewritten}), TRUE)"
-                remaining = _filter_batch(
-                    remaining, not_cond, _pre_rewritten=True,
-                )
-            else:
-                remaining = remaining.slice(0, 0)
-        if not parts:
-            return captured_schema.empty_table()
-        return pa.concat_tables(parts)
-
+    _transform = _build_matched_transform(
+        clauses,
+        on_map=dict(zip(source_on, target_on)),
+        on_pairs=list(zip(source_on, target_on)),
+        update_cols=list(update_cols),
+        row_id_name=row_id_name,
+        update_schema=update_schema,
+    )
     return joined.map_batches(_transform, **_map_kwargs(ray_remote_args))
 
 
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py 
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index c13e80818d..7e365009db 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -1679,6 +1679,363 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
             )
         self.assertIn('multiple source rows', str(ctx.exception))
 
+    def test_self_merge_update_literal(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'name': ['a', 'b', 'c'],
+                    'age': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[WhenMatched(update={'age': lit(99)})],
+        )
+
+        self.assertEqual(result['num_matched'], 3)
+        out = self._read_sorted(target)
+        self.assertEqual(out['age'], [99, 99, 99])
+        self.assertEqual(out['name'], ['a', 'b', 'c'])
+
+    def test_self_merge_update_star(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'name': ['a', 'b', 'c'],
+                    'age': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[WhenMatched(update='*')],
+        )
+
+        self.assertEqual(result['num_matched'], 3)
+        out = self._read_sorted(target)
+        self.assertEqual(out['id'], [1, 2, 3])
+        self.assertEqual(out['name'], ['a', 'b', 'c'])
+        self.assertEqual(out['age'], [10, 20, 30])
+
+    @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
+    def test_self_merge_with_condition(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'name': ['a', 'b', 'c'],
+                    'age': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[WhenMatched(update={'age': lit(99)}, 
condition='t.age > 15')],
+        )
+
+        self.assertEqual(result['num_matched'], 2)
+        out = self._read_sorted(target)
+        self.assertEqual(out['age'], [10, 99, 99])
+
+    @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
+    def test_self_merge_with_source_condition(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'name': ['a', 'b', 'c'],
+                    'age': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[WhenMatched(
+                update={'name': lit('updated')},
+                condition='s.age > 15',
+            )],
+        )
+
+        self.assertEqual(result['num_matched'], 2)
+        out = self._read_sorted(target)
+        self.assertEqual(out['name'], ['a', 'updated', 'updated'])
+        self.assertEqual(out['age'], [10, 20, 30])
+
+    def test_self_merge_rejects_not_matched(self):
+        target = self._create_table()
+        self._write(target, self._source(ids=(1,)))
+
+        with self.assertRaises(ValueError) as ctx:
+            merge_into(
+                target=target,
+                source=target,
+                catalog_options=self.catalog_options,
+                on=['_ROW_ID'],
+                when_matched=[WhenMatched(update='*')],
+                when_not_matched=[WhenNotMatched(insert='*')],
+            )
+        self.assertIn('Self-merge', str(ctx.exception))
+
+    def test_self_merge_partial_set(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2], type=pa.int32()),
+                    'name': ['old_a', 'old_b'],
+                    'age': pa.array([10, 20], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[WhenMatched(update={'name': lit('updated')})],
+        )
+
+        self.assertEqual(result['num_matched'], 2)
+        out = self._read_sorted(target)
+        self.assertEqual(out['name'], ['updated', 'updated'])
+        self.assertEqual(out['age'], [10, 20])
+
+    def test_self_merge_source_col_row_id(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2], type=pa.int32()),
+                    'name': ['a', 'b'],
+                    'age': pa.array([10, 20], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[WhenMatched(update={'name': source_col('_ROW_ID')})],
+        )
+
+        self.assertEqual(result['num_matched'], 2)
+        out = self._read_sorted(target)
+        for v in out['name']:
+            self.assertTrue(int(v) >= 0)
+
+    @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
+    def test_self_merge_condition_on_row_id(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'name': ['a', 'b', 'c'],
+                    'age': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[
+                WhenMatched(
+                    update={'age': lit(99)},
+                    condition='s._ROW_ID >= 0',
+                ),
+            ],
+        )
+
+        self.assertEqual(result['num_matched'], 3)
+        out = self._read_sorted(target)
+        self.assertEqual(out['age'], [99, 99, 99])
+
+    @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
+    def test_self_merge_condition_on_target_row_id(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'name': ['a', 'b', 'c'],
+                    'age': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[
+                WhenMatched(
+                    update={'age': lit(99)},
+                    condition='t._ROW_ID >= 0',
+                ),
+            ],
+        )
+
+        self.assertEqual(result['num_matched'], 3)
+        out = self._read_sorted(target)
+        self.assertEqual(out['age'], [99, 99, 99])
+
+    @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
+    def test_self_merge_multi_clause_fall_through(self):
+        target = self._create_table()
+        self._write(
+            target,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'name': ['a', 'b', 'c'],
+                    'age': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=self.pa_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=target,
+            source=target,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[
+                WhenMatched(update={'name': lit('old')}, condition='s.age <= 
10'),
+                WhenMatched(update={'name': lit('young')}, condition='s.age <= 
20'),
+                WhenMatched(update={'name': lit('senior')}),
+            ],
+        )
+
+        self.assertEqual(result['num_matched'], 3)
+        out = self._read_sorted(target)
+        self.assertEqual(out['name'], ['old', 'young', 'senior'])
+        self.assertEqual(out['age'], [10, 20, 30])
+
+    @unittest.skip("blocked by blob DE sequence bug fix, see PR #8147")
+    @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
+    def test_self_merge_blob_source_condition(self):
+        blob_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('picture', pa.large_binary()),
+        ])
+        tbl_name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+        s = Schema.from_pyarrow_schema(blob_schema, options=self.de_options)
+        self.catalog.create_table(tbl_name, s, False)
+
+        self._write(
+            tbl_name,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2], type=pa.int32()),
+                    'name': ['a', 'b'],
+                    'picture': [None, None],
+                },
+                schema=blob_schema,
+            ),
+        )
+
+        result = merge_into(
+            target=tbl_name,
+            source=tbl_name,
+            catalog_options=self.catalog_options,
+            on=['_ROW_ID'],
+            when_matched=[
+                WhenMatched(
+                    update={'name': lit('updated')},
+                    condition='s.picture IS NULL',
+                ),
+            ],
+        )
+
+        self.assertEqual(result['num_matched'], 2)
+        out = self._read_sorted(tbl_name)
+        self.assertEqual(out['name'], ['updated', 'updated'])
+
+    @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
+    def test_self_merge_blob_target_condition_rejected(self):
+        blob_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('picture', pa.large_binary()),
+        ])
+        tbl_name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+        s = Schema.from_pyarrow_schema(blob_schema, options=self.de_options)
+        self.catalog.create_table(tbl_name, s, False)
+
+        self._write(
+            tbl_name,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1], type=pa.int32()),
+                    'name': ['a'],
+                    'picture': [None],
+                },
+                schema=blob_schema,
+            ),
+        )
+
+        with self.assertRaises(ValueError) as ctx:
+            merge_into(
+                target=tbl_name,
+                source=tbl_name,
+                catalog_options=self.catalog_options,
+                on=['_ROW_ID'],
+                when_matched=[
+                    WhenMatched(
+                        update={'name': lit('x')},
+                        condition='t.picture IS NOT NULL',
+                    ),
+                ],
+            )
+        self.assertIn('blob', str(ctx.exception).lower())
+
 
 class TargetProjectionTest(unittest.TestCase):
 

Reply via email to