This is an automated email from the ASF dual-hosted git repository.

YannByron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 86a3af047c [python][daft] Make Daft Paimon read source serializable 
(#8029)
86a3af047c is described below

commit 86a3af047c42c7c84bdef0ca6ae96bf4085e21c9
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Jun 1 11:28:55 2026 +0800

    [python][daft] Make Daft Paimon read source serializable (#8029)
---
 paimon-python/pypaimon/daft/daft_datasource.py     | 198 ++++++++++++++++++---
 .../pypaimon/filesystem/caching_file_io.py         |   4 +
 paimon-python/pypaimon/table/file_store_table.py   |   4 +
 .../pypaimon/tests/daft/daft_data_test.py          | 110 ++++++++++++
 4 files changed, 287 insertions(+), 29 deletions(-)

diff --git a/paimon-python/pypaimon/daft/daft_datasource.py 
b/paimon-python/pypaimon/daft/daft_datasource.py
index 7e4419d2ab..e7c8c547d1 100644
--- a/paimon-python/pypaimon/daft/daft_datasource.py
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -46,7 +46,6 @@ if TYPE_CHECKING:
     from pypaimon.common.predicate import Predicate
     from pypaimon.manifest.schema.data_file_meta import DataFileMeta
     from pypaimon.read.explain import ExplainSplitInfo
-    from pypaimon.read.table_read import TableRead
     from pypaimon.read.split import Split
     from pypaimon.table.file_store_table import FileStoreTable
 
@@ -59,6 +58,8 @@ PAIMON_FILE_FORMAT_PARQUET = "parquet"
 PAIMON_FILE_FORMAT_ORC = "orc"
 PAIMON_FILE_FORMAT_AVRO = "avro"
 
+_PaimonIdentifier = tuple[str, str, str | None]
+
 
 @dataclass(frozen=True, slots=True)
 class _ReadPushdownState:
@@ -80,6 +81,84 @@ class _ReaderRouting:
         return self.reader_mode == READER_MODE_NATIVE_PARQUET
 
 
+def _options_to_dict(options: Any) -> dict[str, Any]:
+    if options is None:
+        return {}
+    if isinstance(options, dict):
+        return dict(options)
+    return dict(options.to_map())
+
+
+def _extract_catalog_options(table: FileStoreTable) -> dict[str, Any]:
+    # Every FileIO exposes catalog properties via ``properties`` (CachingFileIO
+    # delegates to its wrapped FileIO), so no per-implementation handling 
needed.
+    return _options_to_dict(table.file_io.properties)
+
+
+def _extract_identifier(table: FileStoreTable) -> _PaimonIdentifier | None:
+    identifier = table.identifier
+    if identifier is None:
+        return None
+
+    database_name = identifier.get_database_name()
+    table_name = identifier.get_table_name()
+    if database_name is None or table_name is None:
+        return None
+    return database_name, table_name, identifier.get_branch_name()
+
+
+def _extract_table_options(table: FileStoreTable) -> dict[str, Any]:
+    return _options_to_dict(table.schema().options)
+
+
+def _to_paimon_identifier(identifier: _PaimonIdentifier) -> Any:
+    database_name, table_name, branch_name = identifier
+    if branch_name:
+        from pypaimon.common.identifier import Identifier
+
+        return Identifier(database_name, table_name, branch_name)
+    return f"{database_name}.{table_name}"
+
+
+def _load_table(
+    catalog_options: dict[str, Any],
+    table_identifier: _PaimonIdentifier | None,
+    table_path: str | None,
+    table_options: dict[str, Any],
+) -> FileStoreTable:
+    if catalog_options and table_identifier is not None:
+        from pypaimon.catalog.catalog_factory import CatalogFactory
+
+        catalog = CatalogFactory.create(catalog_options)
+        table = catalog.get_table(_to_paimon_identifier(table_identifier))
+    elif table_path:
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        table = FileStoreTable.from_path(table_path)
+    else:
+        raise RuntimeError(
+            "Unable to reconstruct Paimon table while deserializing 
PaimonDataSource."
+        )
+
+    if table_options:
+        table = table.copy(table_options)
+    return table
+
+
+def _build_storage_config(
+    catalog_options: dict[str, Any],
+    multithreaded_io: bool,
+) -> StorageConfig:
+    from daft import context
+    from daft.daft import StorageConfig
+
+    from pypaimon.daft.daft_io_config import 
_convert_paimon_catalog_options_to_io_config
+
+    io_config = _convert_paimon_catalog_options_to_io_config(catalog_options)
+    io_config = io_config or 
context.get_context().daft_planning_config.default_io_config
+    return StorageConfig(multithreaded_io, io_config)
+
+
 class _PaimonPKSplitTask(DataSourceTask):
     """DataSourceTask for PK-table splits that require LSM-tree merge.
 
@@ -90,15 +169,27 @@ class _PaimonPKSplitTask(DataSourceTask):
 
     def __init__(
         self,
-        table_read: TableRead,
+        table_catalog_options: dict[str, Any],
+        table_identifier: _PaimonIdentifier | None,
+        table_path: str | None,
+        table_options: dict[str, Any],
         split: Split,
         schema: Schema,
+        read_columns: list[str] | None = None,
+        limit: int | None = None,
+        predicate: Predicate | None = None,
         output_columns: list[str] | None = None,
         blob_column_names: set[str] | None = None,
     ) -> None:
-        self._table_read = table_read
+        self._table_catalog_options = table_catalog_options
+        self._table_identifier = table_identifier
+        self._table_path = table_path
+        self._table_options = table_options
         self._split = split
         self._schema = schema
+        self._read_columns = read_columns
+        self._limit = limit
+        self._predicate = predicate
         self._output_columns = output_columns
         self._blob_column_names = blob_column_names or set()
 
@@ -107,7 +198,21 @@ class _PaimonPKSplitTask(DataSourceTask):
         return self._schema
 
     async def read(self) -> AsyncIterator[RecordBatch]:
-        reader = self._table_read.to_arrow_batch_reader([self._split])
+        table = _load_table(
+            self._table_catalog_options,
+            self._table_identifier,
+            self._table_path,
+            self._table_options,
+        )
+        read_builder = table.new_read_builder()
+        if self._read_columns is not None:
+            read_builder = read_builder.with_projection(self._read_columns)
+        if self._limit is not None:
+            read_builder = read_builder.with_limit(self._limit)
+        if self._predicate is not None:
+            read_builder = read_builder.with_filter(self._predicate)
+
+        reader = read_builder.new_read().to_arrow_batch_reader([self._split])
         for batch in iter(reader.read_next_batch, None):
             if self._output_columns is not None:
                 batch = batch.select(self._output_columns)
@@ -168,9 +273,55 @@ class PaimonDataSource(DataSource):
         storage_config: StorageConfig,
         catalog_options: dict[str, str],
     ) -> None:
-        self._table = table
         self._storage_config = storage_config
-        self._catalog_options = catalog_options
+        self._catalog_options = dict(catalog_options or {})
+        self._table_catalog_options = {
+            **_extract_catalog_options(table),
+            **self._catalog_options,
+        }
+        self._table_identifier = _extract_identifier(table)
+        table_path = getattr(table, "table_path", None)
+        self._table_path = str(table_path) if table_path is not None else None
+        self._table_options = _extract_table_options(table)
+        self._paimon_predicate: Predicate | None = None
+        self._remaining_filters: list[PyExpr] | None = None
+        self._init_table(table)
+
+    def __getstate__(self) -> dict[str, Any]:
+        return {
+            "_multithreaded_io": self._storage_config.multithreaded_io,
+            "_catalog_options": self._catalog_options,
+            "_table_catalog_options": self._table_catalog_options,
+            "_table_identifier": self._table_identifier,
+            "_table_path": self._table_path,
+            "_table_options": self._table_options,
+            "_paimon_predicate": self._paimon_predicate,
+            "_remaining_filters": self._remaining_filters,
+        }
+
+    def __setstate__(self, state: dict[str, Any]) -> None:
+        self._catalog_options = state["_catalog_options"]
+        self._table_catalog_options = state["_table_catalog_options"]
+        self._table_identifier = state["_table_identifier"]
+        self._table_path = state["_table_path"]
+        self._table_options = state["_table_options"]
+        self._paimon_predicate = state["_paimon_predicate"]
+        self._remaining_filters = state["_remaining_filters"]
+        self._storage_config = _build_storage_config(
+            self._table_catalog_options,
+            state["_multithreaded_io"],
+        )
+
+        table = _load_table(
+            self._table_catalog_options,
+            self._table_identifier,
+            self._table_path,
+            self._table_options,
+        )
+        self._init_table(table)
+
+    def _init_table(self, table: FileStoreTable) -> None:
+        self._table = table
 
         from pypaimon.schema.data_types import PyarrowFieldParser
 
@@ -194,7 +345,11 @@ class PaimonDataSource(DataSource):
         else:
             self._schema = Schema.from_pyarrow_schema(pa_schema)
 
-        warehouse = catalog_options.get("warehouse", "")
+        warehouse = (
+            self._catalog_options.get("warehouse")
+            or self._table_catalog_options.get("warehouse")
+            or ""
+        )
         self._warehouse_scheme = urlparse(warehouse).scheme
 
         self._file_format = table.options.file_format().lower()
@@ -320,14 +475,15 @@ class PaimonDataSource(DataSource):
                     routing.fallback_reason,
                 )
                 yield _PaimonPKSplitTask(
-                    self._fallback_read_builder(
-                        read_table,
-                        read_pushdowns.read_columns,
-                        read_pushdowns.source_limit,
-                        read_pushdowns.reader_predicate,
-                    ).new_read(),
+                    self._table_catalog_options,
+                    self._table_identifier,
+                    self._table_path,
+                    _extract_table_options(read_table),
                     split,
                     self._project_schema(read_pushdowns.task_columns),
+                    read_pushdowns.read_columns,
+                    read_pushdowns.source_limit,
+                    read_pushdowns.reader_predicate,
                     read_pushdowns.task_columns,
                     self._blob_column_names,
                 )
@@ -602,22 +758,6 @@ class PaimonDataSource(DataSource):
             [(name, field_map[name].dtype) for name in columns if name in 
field_map]
         )
 
-    def _fallback_read_builder(
-        self,
-        table: FileStoreTable,
-        read_columns: list[str] | None,
-        limit: int | None,
-        predicate: Predicate | None,
-    ) -> Any:
-        read_builder = table.new_read_builder()
-        if read_columns is not None:
-            read_builder = read_builder.with_projection(read_columns)
-        if limit is not None:
-            read_builder = read_builder.with_limit(limit)
-        if predicate is not None:
-            read_builder = read_builder.with_filter(predicate)
-        return read_builder
-
     def _read_pushdown_state(
         self,
         table: FileStoreTable,
diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py 
b/paimon-python/pypaimon/filesystem/caching_file_io.py
index 38afc2eb73..2384ed0ca8 100644
--- a/paimon-python/pypaimon/filesystem/caching_file_io.py
+++ b/paimon-python/pypaimon/filesystem/caching_file_io.py
@@ -387,6 +387,10 @@ class CachingFileIO(FileIO):
             return file_io
         return CachingFileIO(file_io, cache, whitelist)
 
+    @property
+    def properties(self):
+        return self._delegate.properties
+
     def new_input_stream(self, path: str):
         file_type = FileType.classify(path)
         if self._cache is None or file_type not in self._whitelist or 
FileType.is_mutable(path):
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 67be2587b7..9d418cbb26 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -81,6 +81,10 @@ class FileStoreTable(Table):
 
         return cls(file_io, identifier, table_path, table_schema)
 
+    def schema(self) -> TableSchema:
+        """Get the table schema."""
+        return self.table_schema
+
     def current_branch(self) -> str:
         """Get the current branch name from the identifier."""
         return self.identifier.get_branch_name_or_default()
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py 
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
index e587aa6d1d..2a959e290a 100644
--- a/paimon-python/pypaimon/tests/daft/daft_data_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -62,6 +62,18 @@ def _predicate_leaves(predicate):
 # ---------------------------------------------------------------------------
 
 
+class _UnserializableFileIoMarker:
+    def __reduce__(self):
+        raise TypeError("file io marker should not be serialized")
+
+
+class _UnserializableStorageConfig:
+    multithreaded_io = False
+
+    def __reduce__(self):
+        raise TypeError("storage config marker should not be serialized")
+
+
 def _write_to_paimon(table, arrow_table, mode="append", 
overwrite_partition=None):
     write_builder = table.new_batch_write_builder()
     if mode == "overwrite":
@@ -209,6 +221,104 @@ def test_read_paimon_schema_matches(append_only_table):
     assert "dt" in schema.column_names()
 
 
+def test_read_paimon_source_is_serializable(append_only_table):
+    """The Daft source must not serialize live table/file_io/storage 
objects."""
+    from daft.pickle import dumps, loads
+
+    from pypaimon.daft.daft_datasource import PaimonDataSource
+
+    table, _ = append_only_table
+    table.file_io._unserializable_marker = _UnserializableFileIoMarker()
+
+    source = PaimonDataSource(
+        table,
+        storage_config=_UnserializableStorageConfig(),
+        catalog_options={},
+    )
+
+    restored = loads(dumps(source))
+
+    assert restored is not source
+    assert restored.schema.column_names() == source.schema.column_names()
+    assert restored._table is not table
+    assert restored._table.identifier.get_full_name() == 
table.identifier.get_full_name()
+    assert restored._storage_config.multithreaded_io is False
+
+
+def test_read_paimon_remote_ray_task_is_serializable(pk_table, monkeypatch):
+    """A fallback PK split task must reopen the table from metadata on Ray 
workers.
+
+    Splits that need an LSM merge (here, overlapping primary-key writes) are 
read
+    by the pypaimon reader task. Under the Ray runner that task is pickled to
+    remote workers, so it must serialize only rebuildable metadata -- never the
+    live table / file_io / storage objects.
+    """
+    from daft import runners
+    from daft.io.pushdowns import Pushdowns
+    from daft.pickle import dumps, loads
+
+    from pypaimon.daft.daft_datasource import PaimonDataSource
+
+    class _RayRunner:
+        name = "ray"
+
+    table, _ = pk_table
+    # Two overlapping writes on id=1 create non-raw-convertible splits that
+    # require the pypaimon merge reader (the fallback _PaimonPKSplitTask).
+    _write_to_paimon(
+        table,
+        pa.table(
+            {
+                "id": pa.array([1, 2], pa.int64()),
+                "name": pa.array(["old_a", "old_b"], pa.string()),
+                "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+            }
+        ),
+    )
+    _write_to_paimon(
+        table,
+        pa.table(
+            {
+                "id": pa.array([1], pa.int64()),
+                "name": pa.array(["new_a"], pa.string()),
+                "dt": pa.array(["2024-01-01"], pa.string()),
+            }
+        ),
+    )
+    table.file_io._unserializable_marker = _UnserializableFileIoMarker()
+
+    source = PaimonDataSource(
+        table,
+        storage_config=_UnserializableStorageConfig(),
+        catalog_options={},
+    )
+    monkeypatch.setattr(runners, "get_or_create_runner", lambda: _RayRunner())
+
+    async def first_task():
+        async for task in source.get_tasks(Pushdowns()):
+            return task
+        raise AssertionError("Expected at least one task")
+
+    async def read_task(task):
+        rows = []
+        async for batch in task.read():
+            rows.append(batch.to_pydict())
+        return rows
+
+    task = asyncio.run(first_task())
+    assert type(task).__name__ == "_PaimonPKSplitTask"
+
+    restored_task = loads(dumps(task))
+    batches = asyncio.run(read_task(restored_task))
+
+    merged = {
+        _id: name
+        for batch in batches
+        for _id, name in zip(batch["id"], batch["name"])
+    }
+    assert merged == {1: "new_a", 2: "old_b"}
+
+
 # ---------------------------------------------------------------------------
 # Multi-partition reads
 # ---------------------------------------------------------------------------

Reply via email to