This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b0809bc35d [python][ray] Add e2e test and docs for ray merge_into
feature backfill (#8172)
b0809bc35d is described below
commit b0809bc35db399f74063b6e98b59917811ecfad6
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Jun 9 12:13:12 2026 +0800
[python][ray] Add e2e test and docs for ray merge_into feature backfill
(#8172)
---
docs/docs/learn-paimon/scenario-guide.mdx | 59 ++++++++++++++++++
docs/docs/pypaimon/ray-data.md | 3 +
.../tests/ray_data_evolution_merge_into_test.py | 72 +++++++++++++++++++++-
3 files changed, 133 insertions(+), 1 deletion(-)
diff --git a/docs/docs/learn-paimon/scenario-guide.mdx
b/docs/docs/learn-paimon/scenario-guide.mdx
index 333c91689a..341ceddae9 100644
--- a/docs/docs/learn-paimon/scenario-guide.mdx
+++ b/docs/docs/learn-paimon/scenario-guide.mdx
@@ -525,6 +525,65 @@ and merges them at read time. This is ideal for:
- Iterative ML feature engineering — add, update, or refine features as your
model evolves.
- Reducing I/O cost and storage overhead for frequent partial column updates.
+#### Distributed Feature Backfill with Ray
+
+For pipelines that derive features from large payloads, keep the payload in
Blob columns and update only the derived
+feature columns. Ray can read the records to process, run distributed
computation, and call PyPaimon's Ray `merge_into`
+to write feature values back to the data-evolution table. The example assumes
the target table has a key column `id`,
+a Blob column `payload`, and a non-Blob feature column `feature` to update.
+
+```python
+import ray
+from pypaimon.ray import read_paimon, merge_into, WhenMatched, source_col
+
+catalog_options = {"warehouse": "/path/to/warehouse"}
+target = "db.item_features"
+num_partitions = 1024
+
+# Keys selected by an upstream job. This can also come from another Paimon
table,
+# Parquet files, or any Ray Dataset.
+records_to_process = ray.data.read_parquet("/path/to/records-to-process/")
+
+# Read only the columns needed for this job.
+target_rows = read_paimon(
+ target,
+ catalog_options=catalog_options,
+ projection=["id", "payload"],
+)
+
+selected = records_to_process.join(
+ target_rows,
+ join_type="inner",
+ num_partitions=num_partitions,
+ on=["id"],
+)
+
+def compute_feature(batch):
+ # Call your model service here. This example only keeps the shape simple.
+ payloads = batch["payload"].to_pylist()
+ return {
+ "id": batch["id"].to_pylist(),
+ "new_feature": [len(v) if v is not None else 0 for v in payloads],
+ }
+
+updates = selected.map_batches(compute_feature, batch_format="pyarrow")
+
+merge_into(
+ target=target,
+ source=updates,
+ catalog_options=catalog_options,
+ on=["id"],
+ when_matched=[
+ WhenMatched(update={"feature": source_col("new_feature")})
+ ],
+ num_partitions=num_partitions,
+)
+```
+
+`merge_into` rewrites only the matched non-Blob columns. Existing Blob files
stay unchanged, so the source dataset does
+not need to carry Blob columns when it only updates feature fields. For very
large key lists, tune Ray resources and
+`num_partitions`; selecting target rows is still a distributed join.
+
### Scenario 11: Python AI Pipeline (PyPaimon)
**When:** You are building ML training or inference pipelines in Python and
need to read/write Paimon tables natively
diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index 03364aa743..08a88a60d5 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -412,6 +412,9 @@ columns (`s.*`). Requires the `datafusion` package: `pip
install pypaimon[sql]`.
counts the rows actually updated (after condition filtering). `num_unchanged`
is `0` in the current implementation.
+For an end-to-end feature update workflow on Blob tables, see
+[Distributed Feature Backfill with
Ray](../learn-paimon/scenario-guide#distributed-feature-backfill-with-ray).
+
**Notes:**
- Partition key columns cannot be updated by matched clauses. If the target
table is partitioned, `merge_into` raises an error when `when_matched` is
diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
index cd2745dc0f..b70ef40a58 100644
--- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py
@@ -28,7 +28,7 @@ import ray
from pypaimon import CatalogFactory, Schema
from pypaimon.ray import (
- WhenMatched, WhenNotMatched, merge_into,
+ WhenMatched, WhenNotMatched, merge_into, read_paimon,
source_col, target_col, lit,
)
@@ -516,6 +516,76 @@ class RayDataEvolutionMergeIntoTest(unittest.TestCase):
)
self.assertEqual({'payload'}, _blob_col_names(fake_table))
+ def test_blob_table_feature_update(self):
+ blob_schema = pa.schema([
+ ('id', pa.int32()),
+ ('payload', pa.large_binary()),
+ ('feature', pa.int32()),
+ ])
+ name = f'default.tbl_{uuid.uuid4().hex[:8]}'
+ schema = Schema.from_pyarrow_schema(blob_schema,
options=self.de_options)
+ self.catalog.create_table(name, schema, False)
+ self._write(
+ name,
+ pa.Table.from_pydict(
+ {
+ 'id': pa.array([1, 2, 3], type=pa.int32()),
+ 'payload': [b'aa', b'bbb', b'cccc'],
+ 'feature': pa.array([10, 20, 30], type=pa.int32()),
+ },
+ schema=blob_schema,
+ ),
+ )
+
+ num_partitions = _TEST_NUM_PARTITIONS
+ records_to_process = ray.data.from_arrow(pa.Table.from_pydict({
+ 'id': pa.array([1, 3], type=pa.int32()),
+ }))
+ target_rows = read_paimon(
+ name,
+ self.catalog_options,
+ projection=['id', 'payload'],
+ )
+ selected = records_to_process.join(
+ target_rows,
+ join_type='inner',
+ num_partitions=num_partitions,
+ on=['id'],
+ )
+
+ def compute_feature(batch):
+ payloads = batch['payload'].to_pylist()
+ return pa.Table.from_pydict({
+ 'id': batch['id'],
+ 'new_feature': pa.array(
+ [len(v) if v is not None else 0 for v in payloads],
+ type=pa.int32(),
+ ),
+ })
+
+ updates = selected.map_batches(compute_feature, batch_format='pyarrow')
+ metrics = merge_into(
+ target=name,
+ source=updates,
+ catalog_options=self.catalog_options,
+ on=['id'],
+ when_matched=[
+ WhenMatched(update={'feature': source_col('new_feature')})
+ ],
+ num_partitions=num_partitions,
+ )
+
+ table = self.catalog.get_table(name)
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
+ self.assertEqual(out['id'], [1, 2, 3])
+ self.assertEqual(out['feature'], [2, 20, 4])
+ self.assertEqual(out['payload'], [b'aa', b'bbb', b'cccc'])
+ self.assertEqual(metrics, {
+ 'num_matched': 2, 'num_inserted': 0, 'num_unchanged': 0,
+ })
+
def test_combined_writes_single_snapshot(self):
target = self._create_table()
self._write(