This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 22a1e06418 [python] support ray data sink to paimon (#6883)
22a1e06418 is described below
commit 22a1e06418ccbadd917449ad1147090b7c060701
Author: FreemanDane <[email protected]>
AuthorDate: Mon Jan 5 19:29:54 2026 +0800
[python] support ray data sink to paimon (#6883)
---
paimon-python/pypaimon/tests/ray_data_test.py | 53 ++++++++++++++++
paimon-python/pypaimon/write/ray_datasink.py | 91 +++++++++++++++++++++++++++
paimon-python/pypaimon/write/table_write.py | 5 ++
3 files changed, 149 insertions(+)
diff --git a/paimon-python/pypaimon/tests/ray_data_test.py
b/paimon-python/pypaimon/tests/ray_data_test.py
index 42bca3d60b..e931a4c7dc 100644
--- a/paimon-python/pypaimon/tests/ray_data_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_test.py
@@ -131,6 +131,59 @@ class RayDataTest(unittest.TestCase):
"Name column should match"
)
+ def test_basic_ray_data_write(self):
+ """Test basic Ray Data write from PyPaimon table."""
+ # Create schema
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('value', pa.int64()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.catalog.create_table('default.test_ray_write', schema, False)
+ table = self.catalog.get_table('default.test_ray_write')
+
+ # Write test data
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+ 'value': [100, 200, 300, 400, 500],
+ }, schema=pa_schema)
+
+ from ray.data.read_api import from_arrow
+ ds = from_arrow(test_data)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_raydata(ds, parallelism=2)
+ # Read using Ray Data
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ table_scan = read_builder.new_scan()
+ splits = table_scan.plan().splits()
+
+ arrow_result = table_read.to_arrow(splits)
+
+ # Verify PyArrow table
+ self.assertIsNotNone(arrow_result, "Arrow table should not be None")
+ self.assertEqual(arrow_result.num_rows, 5, "Should have 5 rows")
+
+ # Test basic operations - get first 3 rows
+ sample_table = arrow_result.slice(0, 3)
+ self.assertEqual(sample_table.num_rows, 3, "Should have 3 sample rows")
+
+ # Convert to pandas for verification
+ df = arrow_result.to_pandas()
+ self.assertEqual(len(df), 5, "DataFrame should have 5 rows")
+ # Sort by id to ensure order-independent comparison
+ df_sorted = df.sort_values(by='id').reset_index(drop=True)
+ self.assertEqual(list(df_sorted['id']), [1, 2, 3, 4, 5], "ID column
should match")
+ self.assertEqual(
+ list(df_sorted['name']),
+ ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+ "Name column should match"
+ )
+
def test_ray_data_with_predicate(self):
"""Test Ray Data read with predicate filtering."""
# Create schema
diff --git a/paimon-python/pypaimon/write/ray_datasink.py
b/paimon-python/pypaimon/write/ray_datasink.py
new file mode 100644
index 0000000000..709a010fa4
--- /dev/null
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Module to reawrited a Paimon table from a Ray Dataset, by using the Ray
Datasink API.
+"""
+
+from typing import Iterable
+from ray.data.datasource.datasink import Datasink, WriteResult, WriteReturnType
+from pypaimon.table.table import Table
+from pypaimon.write.write_builder import WriteBuilder
+from ray.data.block import BlockAccessor
+from ray.data.block import Block
+from ray.data._internal.execution.interfaces import TaskContext
+import pyarrow as pa
+
+
+class PaimonDatasink(Datasink):
+
+ def __init__(self, table: Table, overwrite=False):
+ self.table = table
+ self.overwrite = overwrite
+
+ def on_write_start(self, schema=None) -> None:
+ """Callback for when a write job starts.
+
+ Use this method to perform setup for write tasks. For example,
creating a
+ staging bucket in S3.
+
+ Args:
+ schema: Optional schema information passed by Ray Data.
+ """
+ self.writer_builder: WriteBuilder =
self.table.new_batch_write_builder()
+ if self.overwrite:
+ self.writer_builder = self.writer_builder.overwrite()
+
+ def write(
+ self,
+ blocks: Iterable[Block],
+ ctx: TaskContext,
+ ) -> WriteReturnType:
+ """Write blocks. This is used by a single write task.
+
+ Args:
+ blocks: Generator of data blocks.
+ ctx: ``TaskContext`` for the write task.
+
+ Returns:
+ Result of this write task. When the entire write operator finishes,
+ All returned values will be passed as `WriteResult.write_returns`
+ to `Datasink.on_write_complete`.
+ """
+ table_write = self.writer_builder.new_write()
+ for block in blocks:
+ block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow()
+ table_write.write_arrow(block_arrow)
+ commit_messages = table_write.prepare_commit()
+ table_write.close()
+ return commit_messages
+
+ def on_write_complete(self, write_result: WriteResult[WriteReturnType]):
+ """Callback for when a write job completes.
+
+ This can be used to `commit` a write output. This method must
+ succeed prior to ``write_datasink()`` returning to the user. If this
+ method fails, then ``on_write_failed()`` is called.
+
+ Args:
+ write_result: Aggregated result of the
+ Write operator, containing write results and stats.
+ """
+ table_commit = self.writer_builder.new_commit()
+ table_commit.commit([
+ commit_message
+ for commit_messages in write_result.write_returns
+ for commit_message in commit_messages
+ ])
+ table_commit.close()
diff --git a/paimon-python/pypaimon/write/table_write.py
b/paimon-python/pypaimon/write/table_write.py
index 0ac73356a3..9a4bdecf26 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -68,6 +68,11 @@ class TableWrite:
self.file_store_write.write_cols = write_cols
return self
+ def write_raydata(self, dataset, overwrite=False, parallelism=1):
+ from pypaimon.write.ray_datasink import PaimonDatasink
+ datasink = PaimonDatasink(self.table, overwrite=overwrite)
+ dataset.write_datasink(datasink, concurrency=parallelism)
+
def close(self):
self.file_store_write.close()