This is an automated email from the ASF dual-hosted git repository.
YannByron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 86a3af047c [python][daft] Make Daft Paimon read source serializable
(#8029)
86a3af047c is described below
commit 86a3af047c42c7c84bdef0ca6ae96bf4085e21c9
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Jun 1 11:28:55 2026 +0800
[python][daft] Make Daft Paimon read source serializable (#8029)
---
paimon-python/pypaimon/daft/daft_datasource.py | 198 ++++++++++++++++++---
.../pypaimon/filesystem/caching_file_io.py | 4 +
paimon-python/pypaimon/table/file_store_table.py | 4 +
.../pypaimon/tests/daft/daft_data_test.py | 110 ++++++++++++
4 files changed, 287 insertions(+), 29 deletions(-)
diff --git a/paimon-python/pypaimon/daft/daft_datasource.py
b/paimon-python/pypaimon/daft/daft_datasource.py
index 7e4419d2ab..e7c8c547d1 100644
--- a/paimon-python/pypaimon/daft/daft_datasource.py
+++ b/paimon-python/pypaimon/daft/daft_datasource.py
@@ -46,7 +46,6 @@ if TYPE_CHECKING:
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.explain import ExplainSplitInfo
- from pypaimon.read.table_read import TableRead
from pypaimon.read.split import Split
from pypaimon.table.file_store_table import FileStoreTable
@@ -59,6 +58,8 @@ PAIMON_FILE_FORMAT_PARQUET = "parquet"
PAIMON_FILE_FORMAT_ORC = "orc"
PAIMON_FILE_FORMAT_AVRO = "avro"
+_PaimonIdentifier = tuple[str, str, str | None]
+
@dataclass(frozen=True, slots=True)
class _ReadPushdownState:
@@ -80,6 +81,84 @@ class _ReaderRouting:
return self.reader_mode == READER_MODE_NATIVE_PARQUET
+def _options_to_dict(options: Any) -> dict[str, Any]:
+ if options is None:
+ return {}
+ if isinstance(options, dict):
+ return dict(options)
+ return dict(options.to_map())
+
+
+def _extract_catalog_options(table: FileStoreTable) -> dict[str, Any]:
+ # Every FileIO exposes catalog properties via ``properties`` (CachingFileIO
+ # delegates to its wrapped FileIO), so no per-implementation handling
needed.
+ return _options_to_dict(table.file_io.properties)
+
+
+def _extract_identifier(table: FileStoreTable) -> _PaimonIdentifier | None:
+ identifier = table.identifier
+ if identifier is None:
+ return None
+
+ database_name = identifier.get_database_name()
+ table_name = identifier.get_table_name()
+ if database_name is None or table_name is None:
+ return None
+ return database_name, table_name, identifier.get_branch_name()
+
+
+def _extract_table_options(table: FileStoreTable) -> dict[str, Any]:
+ return _options_to_dict(table.schema().options)
+
+
+def _to_paimon_identifier(identifier: _PaimonIdentifier) -> Any:
+ database_name, table_name, branch_name = identifier
+ if branch_name:
+ from pypaimon.common.identifier import Identifier
+
+ return Identifier(database_name, table_name, branch_name)
+ return f"{database_name}.{table_name}"
+
+
+def _load_table(
+ catalog_options: dict[str, Any],
+ table_identifier: _PaimonIdentifier | None,
+ table_path: str | None,
+ table_options: dict[str, Any],
+) -> FileStoreTable:
+ if catalog_options and table_identifier is not None:
+ from pypaimon.catalog.catalog_factory import CatalogFactory
+
+ catalog = CatalogFactory.create(catalog_options)
+ table = catalog.get_table(_to_paimon_identifier(table_identifier))
+ elif table_path:
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ table = FileStoreTable.from_path(table_path)
+ else:
+ raise RuntimeError(
+ "Unable to reconstruct Paimon table while deserializing
PaimonDataSource."
+ )
+
+ if table_options:
+ table = table.copy(table_options)
+ return table
+
+
+def _build_storage_config(
+ catalog_options: dict[str, Any],
+ multithreaded_io: bool,
+) -> StorageConfig:
+ from daft import context
+ from daft.daft import StorageConfig
+
+ from pypaimon.daft.daft_io_config import
_convert_paimon_catalog_options_to_io_config
+
+ io_config = _convert_paimon_catalog_options_to_io_config(catalog_options)
+ io_config = io_config or
context.get_context().daft_planning_config.default_io_config
+ return StorageConfig(multithreaded_io, io_config)
+
+
class _PaimonPKSplitTask(DataSourceTask):
"""DataSourceTask for PK-table splits that require LSM-tree merge.
@@ -90,15 +169,27 @@ class _PaimonPKSplitTask(DataSourceTask):
def __init__(
self,
- table_read: TableRead,
+ table_catalog_options: dict[str, Any],
+ table_identifier: _PaimonIdentifier | None,
+ table_path: str | None,
+ table_options: dict[str, Any],
split: Split,
schema: Schema,
+ read_columns: list[str] | None = None,
+ limit: int | None = None,
+ predicate: Predicate | None = None,
output_columns: list[str] | None = None,
blob_column_names: set[str] | None = None,
) -> None:
- self._table_read = table_read
+ self._table_catalog_options = table_catalog_options
+ self._table_identifier = table_identifier
+ self._table_path = table_path
+ self._table_options = table_options
self._split = split
self._schema = schema
+ self._read_columns = read_columns
+ self._limit = limit
+ self._predicate = predicate
self._output_columns = output_columns
self._blob_column_names = blob_column_names or set()
@@ -107,7 +198,21 @@ class _PaimonPKSplitTask(DataSourceTask):
return self._schema
async def read(self) -> AsyncIterator[RecordBatch]:
- reader = self._table_read.to_arrow_batch_reader([self._split])
+ table = _load_table(
+ self._table_catalog_options,
+ self._table_identifier,
+ self._table_path,
+ self._table_options,
+ )
+ read_builder = table.new_read_builder()
+ if self._read_columns is not None:
+ read_builder = read_builder.with_projection(self._read_columns)
+ if self._limit is not None:
+ read_builder = read_builder.with_limit(self._limit)
+ if self._predicate is not None:
+ read_builder = read_builder.with_filter(self._predicate)
+
+ reader = read_builder.new_read().to_arrow_batch_reader([self._split])
for batch in iter(reader.read_next_batch, None):
if self._output_columns is not None:
batch = batch.select(self._output_columns)
@@ -168,9 +273,55 @@ class PaimonDataSource(DataSource):
storage_config: StorageConfig,
catalog_options: dict[str, str],
) -> None:
- self._table = table
self._storage_config = storage_config
- self._catalog_options = catalog_options
+ self._catalog_options = dict(catalog_options or {})
+ self._table_catalog_options = {
+ **_extract_catalog_options(table),
+ **self._catalog_options,
+ }
+ self._table_identifier = _extract_identifier(table)
+ table_path = getattr(table, "table_path", None)
+ self._table_path = str(table_path) if table_path is not None else None
+ self._table_options = _extract_table_options(table)
+ self._paimon_predicate: Predicate | None = None
+ self._remaining_filters: list[PyExpr] | None = None
+ self._init_table(table)
+
+ def __getstate__(self) -> dict[str, Any]:
+ return {
+ "_multithreaded_io": self._storage_config.multithreaded_io,
+ "_catalog_options": self._catalog_options,
+ "_table_catalog_options": self._table_catalog_options,
+ "_table_identifier": self._table_identifier,
+ "_table_path": self._table_path,
+ "_table_options": self._table_options,
+ "_paimon_predicate": self._paimon_predicate,
+ "_remaining_filters": self._remaining_filters,
+ }
+
+ def __setstate__(self, state: dict[str, Any]) -> None:
+ self._catalog_options = state["_catalog_options"]
+ self._table_catalog_options = state["_table_catalog_options"]
+ self._table_identifier = state["_table_identifier"]
+ self._table_path = state["_table_path"]
+ self._table_options = state["_table_options"]
+ self._paimon_predicate = state["_paimon_predicate"]
+ self._remaining_filters = state["_remaining_filters"]
+ self._storage_config = _build_storage_config(
+ self._table_catalog_options,
+ state["_multithreaded_io"],
+ )
+
+ table = _load_table(
+ self._table_catalog_options,
+ self._table_identifier,
+ self._table_path,
+ self._table_options,
+ )
+ self._init_table(table)
+
+ def _init_table(self, table: FileStoreTable) -> None:
+ self._table = table
from pypaimon.schema.data_types import PyarrowFieldParser
@@ -194,7 +345,11 @@ class PaimonDataSource(DataSource):
else:
self._schema = Schema.from_pyarrow_schema(pa_schema)
- warehouse = catalog_options.get("warehouse", "")
+ warehouse = (
+ self._catalog_options.get("warehouse")
+ or self._table_catalog_options.get("warehouse")
+ or ""
+ )
self._warehouse_scheme = urlparse(warehouse).scheme
self._file_format = table.options.file_format().lower()
@@ -320,14 +475,15 @@ class PaimonDataSource(DataSource):
routing.fallback_reason,
)
yield _PaimonPKSplitTask(
- self._fallback_read_builder(
- read_table,
- read_pushdowns.read_columns,
- read_pushdowns.source_limit,
- read_pushdowns.reader_predicate,
- ).new_read(),
+ self._table_catalog_options,
+ self._table_identifier,
+ self._table_path,
+ _extract_table_options(read_table),
split,
self._project_schema(read_pushdowns.task_columns),
+ read_pushdowns.read_columns,
+ read_pushdowns.source_limit,
+ read_pushdowns.reader_predicate,
read_pushdowns.task_columns,
self._blob_column_names,
)
@@ -602,22 +758,6 @@ class PaimonDataSource(DataSource):
[(name, field_map[name].dtype) for name in columns if name in
field_map]
)
- def _fallback_read_builder(
- self,
- table: FileStoreTable,
- read_columns: list[str] | None,
- limit: int | None,
- predicate: Predicate | None,
- ) -> Any:
- read_builder = table.new_read_builder()
- if read_columns is not None:
- read_builder = read_builder.with_projection(read_columns)
- if limit is not None:
- read_builder = read_builder.with_limit(limit)
- if predicate is not None:
- read_builder = read_builder.with_filter(predicate)
- return read_builder
-
def _read_pushdown_state(
self,
table: FileStoreTable,
diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py
b/paimon-python/pypaimon/filesystem/caching_file_io.py
index 38afc2eb73..2384ed0ca8 100644
--- a/paimon-python/pypaimon/filesystem/caching_file_io.py
+++ b/paimon-python/pypaimon/filesystem/caching_file_io.py
@@ -387,6 +387,10 @@ class CachingFileIO(FileIO):
return file_io
return CachingFileIO(file_io, cache, whitelist)
+ @property
+ def properties(self):
+ return self._delegate.properties
+
def new_input_stream(self, path: str):
file_type = FileType.classify(path)
if self._cache is None or file_type not in self._whitelist or
FileType.is_mutable(path):
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 67be2587b7..9d418cbb26 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -81,6 +81,10 @@ class FileStoreTable(Table):
return cls(file_io, identifier, table_path, table_schema)
+ def schema(self) -> TableSchema:
+ """Get the table schema."""
+ return self.table_schema
+
def current_branch(self) -> str:
"""Get the current branch name from the identifier."""
return self.identifier.get_branch_name_or_default()
diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py
b/paimon-python/pypaimon/tests/daft/daft_data_test.py
index e587aa6d1d..2a959e290a 100644
--- a/paimon-python/pypaimon/tests/daft/daft_data_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py
@@ -62,6 +62,18 @@ def _predicate_leaves(predicate):
# ---------------------------------------------------------------------------
+class _UnserializableFileIoMarker:
+ def __reduce__(self):
+ raise TypeError("file io marker should not be serialized")
+
+
+class _UnserializableStorageConfig:
+ multithreaded_io = False
+
+ def __reduce__(self):
+ raise TypeError("storage config marker should not be serialized")
+
+
def _write_to_paimon(table, arrow_table, mode="append",
overwrite_partition=None):
write_builder = table.new_batch_write_builder()
if mode == "overwrite":
@@ -209,6 +221,104 @@ def test_read_paimon_schema_matches(append_only_table):
assert "dt" in schema.column_names()
+def test_read_paimon_source_is_serializable(append_only_table):
+ """The Daft source must not serialize live table/file_io/storage
objects."""
+ from daft.pickle import dumps, loads
+
+ from pypaimon.daft.daft_datasource import PaimonDataSource
+
+ table, _ = append_only_table
+ table.file_io._unserializable_marker = _UnserializableFileIoMarker()
+
+ source = PaimonDataSource(
+ table,
+ storage_config=_UnserializableStorageConfig(),
+ catalog_options={},
+ )
+
+ restored = loads(dumps(source))
+
+ assert restored is not source
+ assert restored.schema.column_names() == source.schema.column_names()
+ assert restored._table is not table
+ assert restored._table.identifier.get_full_name() ==
table.identifier.get_full_name()
+ assert restored._storage_config.multithreaded_io is False
+
+
+def test_read_paimon_remote_ray_task_is_serializable(pk_table, monkeypatch):
+ """A fallback PK split task must reopen the table from metadata on Ray
workers.
+
+ Splits that need an LSM merge (here, overlapping primary-key writes) are
read
+ by the pypaimon reader task. Under the Ray runner that task is pickled to
+ remote workers, so it must serialize only rebuildable metadata -- never the
+ live table / file_io / storage objects.
+ """
+ from daft import runners
+ from daft.io.pushdowns import Pushdowns
+ from daft.pickle import dumps, loads
+
+ from pypaimon.daft.daft_datasource import PaimonDataSource
+
+ class _RayRunner:
+ name = "ray"
+
+ table, _ = pk_table
+ # Two overlapping writes on id=1 create non-raw-convertible splits that
+ # require the pypaimon merge reader (the fallback _PaimonPKSplitTask).
+ _write_to_paimon(
+ table,
+ pa.table(
+ {
+ "id": pa.array([1, 2], pa.int64()),
+ "name": pa.array(["old_a", "old_b"], pa.string()),
+ "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()),
+ }
+ ),
+ )
+ _write_to_paimon(
+ table,
+ pa.table(
+ {
+ "id": pa.array([1], pa.int64()),
+ "name": pa.array(["new_a"], pa.string()),
+ "dt": pa.array(["2024-01-01"], pa.string()),
+ }
+ ),
+ )
+ table.file_io._unserializable_marker = _UnserializableFileIoMarker()
+
+ source = PaimonDataSource(
+ table,
+ storage_config=_UnserializableStorageConfig(),
+ catalog_options={},
+ )
+ monkeypatch.setattr(runners, "get_or_create_runner", lambda: _RayRunner())
+
+ async def first_task():
+ async for task in source.get_tasks(Pushdowns()):
+ return task
+ raise AssertionError("Expected at least one task")
+
+ async def read_task(task):
+ rows = []
+ async for batch in task.read():
+ rows.append(batch.to_pydict())
+ return rows
+
+ task = asyncio.run(first_task())
+ assert type(task).__name__ == "_PaimonPKSplitTask"
+
+ restored_task = loads(dumps(task))
+ batches = asyncio.run(read_task(restored_task))
+
+ merged = {
+ _id: name
+ for batch in batches
+ for _id, name in zip(batch["id"], batch["name"])
+ }
+ assert merged == {1: "new_a", 2: "old_b"}
+
+
# ---------------------------------------------------------------------------
# Multi-partition reads
# ---------------------------------------------------------------------------