This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b0809bc35d [python][ray] Add e2e test and docs for ray merge_into 
feature backfill (#8172)
b0809bc35d is described below

commit b0809bc35db399f74063b6e98b59917811ecfad6
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Jun 9 12:13:12 2026 +0800

    [python][ray] Add e2e test and docs for ray merge_into feature backfill 
(#8172)
---
 docs/docs/learn-paimon/scenario-guide.mdx          | 59 ++++++++++++++++++
 docs/docs/pypaimon/ray-data.md                     |  3 +
 .../tests/ray_data_evolution_merge_into_test.py    | 72 +++++++++++++++++++++-
 3 files changed, 133 insertions(+), 1 deletion(-)

diff --git a/docs/docs/learn-paimon/scenario-guide.mdx 
b/docs/docs/learn-paimon/scenario-guide.mdx
index 333c91689a..341ceddae9 100644
--- a/docs/docs/learn-paimon/scenario-guide.mdx
+++ b/docs/docs/learn-paimon/scenario-guide.mdx
@@ -525,6 +525,65 @@ and merges them at read time. This is ideal for:
 - Iterative ML feature engineering — add, update, or refine features as your 
model evolves.
 - Reducing I/O cost and storage overhead for frequent partial column updates.
 
+#### Distributed Feature Backfill with Ray
+
+For pipelines that derive features from large payloads, keep the payload in 
Blob columns and update only the derived
+feature columns. Ray can read the records to process, run distributed 
computation, and call PyPaimon's Ray `merge_into`
+to write feature values back to the data-evolution table. The example assumes 
the target table has a key column `id`,
+a Blob column `payload`, and a non-Blob feature column `feature` to update.
+
+```python
+import ray
+from pypaimon.ray import read_paimon, merge_into, WhenMatched, source_col
+
+catalog_options = {"warehouse": "/path/to/warehouse"}
+target = "db.item_features"
+num_partitions = 1024
+
+# Keys selected by an upstream job. This can also come from another Paimon 
table,
+# Parquet files, or any Ray Dataset.
+records_to_process = ray.data.read_parquet("/path/to/records-to-process/")
+
+# Read only the columns needed for this job.
+target_rows = read_paimon(
+    target,
+    catalog_options=catalog_options,
+    projection=["id", "payload"],
+)
+
+selected = records_to_process.join(
+    target_rows,
+    join_type="inner",
+    num_partitions=num_partitions,
+    on=["id"],
+)
+
+def compute_feature(batch):
+    # Call your model service here. This example only keeps the shape simple.
+    payloads = batch["payload"].to_pylist()
+    return {
+        "id": batch["id"].to_pylist(),
+        "new_feature": [len(v) if v is not None else 0 for v in payloads],
+    }
+
+updates = selected.map_batches(compute_feature, batch_format="pyarrow")
+
+merge_into(
+    target=target,
+    source=updates,
+    catalog_options=catalog_options,
+    on=["id"],
+    when_matched=[
+        WhenMatched(update={"feature": source_col("new_feature")})
+    ],
+    num_partitions=num_partitions,
+)
+```
+
+`merge_into` rewrites only the matched non-Blob columns. Existing Blob files 
stay unchanged, so the source dataset does
+not need to carry Blob columns when it only updates feature fields. For very 
large key lists, tune Ray resources and
+`num_partitions`; selecting target rows is still a distributed join.
+
 ### Scenario 11: Python AI Pipeline (PyPaimon)
 
 **When:** You are building ML training or inference pipelines in Python and 
need to read/write Paimon tables natively
diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index 03364aa743..08a88a60d5 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -412,6 +412,9 @@ columns (`s.*`). Requires the `datafusion` package: `pip 
install pypaimon[sql]`.
 counts the rows actually updated (after condition filtering). `num_unchanged`
 is `0` in the current implementation.
 
+For an end-to-end feature update workflow on Blob tables, see
+[Distributed Feature Backfill with 
Ray](../learn-paimon/scenario-guide#distributed-feature-backfill-with-ray).
+
 **Notes:**
 - Partition key columns cannot be updated by matched clauses. If the target
   table is partitioned, `merge_into` raises an error when `when_matched` is
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py 
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index cd2745dc0f..b70ef40a58 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -28,7 +28,7 @@ import ray
 
 from pypaimon import CatalogFactory, Schema
 from pypaimon.ray import (
-    WhenMatched, WhenNotMatched, merge_into,
+    WhenMatched, WhenNotMatched, merge_into, read_paimon,
     source_col, target_col, lit,
 )
 
@@ -516,6 +516,76 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
         )
         self.assertEqual({'payload'}, _blob_col_names(fake_table))
 
+    def test_blob_table_feature_update(self):
+        blob_schema = pa.schema([
+            ('id', pa.int32()),
+            ('payload', pa.large_binary()),
+            ('feature', pa.int32()),
+        ])
+        name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+        schema = Schema.from_pyarrow_schema(blob_schema, 
options=self.de_options)
+        self.catalog.create_table(name, schema, False)
+        self._write(
+            name,
+            pa.Table.from_pydict(
+                {
+                    'id': pa.array([1, 2, 3], type=pa.int32()),
+                    'payload': [b'aa', b'bbb', b'cccc'],
+                    'feature': pa.array([10, 20, 30], type=pa.int32()),
+                },
+                schema=blob_schema,
+            ),
+        )
+
+        num_partitions = _TEST_NUM_PARTITIONS
+        records_to_process = ray.data.from_arrow(pa.Table.from_pydict({
+            'id': pa.array([1, 3], type=pa.int32()),
+        }))
+        target_rows = read_paimon(
+            name,
+            self.catalog_options,
+            projection=['id', 'payload'],
+        )
+        selected = records_to_process.join(
+            target_rows,
+            join_type='inner',
+            num_partitions=num_partitions,
+            on=['id'],
+        )
+
+        def compute_feature(batch):
+            payloads = batch['payload'].to_pylist()
+            return pa.Table.from_pydict({
+                'id': batch['id'],
+                'new_feature': pa.array(
+                    [len(v) if v is not None else 0 for v in payloads],
+                    type=pa.int32(),
+                ),
+            })
+
+        updates = selected.map_batches(compute_feature, batch_format='pyarrow')
+        metrics = merge_into(
+            target=name,
+            source=updates,
+            catalog_options=self.catalog_options,
+            on=['id'],
+            when_matched=[
+                WhenMatched(update={'feature': source_col('new_feature')})
+            ],
+            num_partitions=num_partitions,
+        )
+
+        table = self.catalog.get_table(name)
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+        self.assertEqual(out['id'], [1, 2, 3])
+        self.assertEqual(out['feature'], [2, 20, 4])
+        self.assertEqual(out['payload'], [b'aa', b'bbb', b'cccc'])
+        self.assertEqual(metrics, {
+            'num_matched': 2, 'num_inserted': 0, 'num_unchanged': 0,
+        })
+
     def test_combined_writes_single_snapshot(self):
         target = self._create_table()
         self._write(

Reply via email to