This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 59722abe5f [python][daft] Make Daft Paimon write sink serializable 
(#8022)
59722abe5f is described below

commit 59722abe5fe49f12f6395f3467e38f1682618a90
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri May 29 11:30:41 2026 +0800

    [python][daft] Make Daft Paimon write sink serializable (#8022)
    
    `PaimonDataSink` currently keeps the `FileStoreTable` and `WriteBuilder`
    directly in the sink object. When Daft runs with the Ray runner, the
    sink needs to be serialized and sent to workers. For OSS/Jindo tables,
    the table can indirectly hold PyArrow/Jindo filesystem objects that are
    not picklable, causing Ray serialization failures.
---
 paimon-python/pypaimon/daft/daft_datasink.py       | 128 ++++++++++++++++++++-
 .../pypaimon/tests/daft/daft_sink_test.py          |  22 ++++
 2 files changed, 148 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/daft/daft_datasink.py 
b/paimon-python/pypaimon/daft/daft_datasink.py
index c019b16a31..7e6b871f06 100644
--- a/paimon-python/pypaimon/daft/daft_datasink.py
+++ b/paimon-python/pypaimon/daft/daft_datasink.py
@@ -32,6 +32,93 @@ if TYPE_CHECKING:
     from pypaimon.table.file_store_table import FileStoreTable
 
 
+_PaimonIdentifier = tuple[str, str, str | None]
+
+
+def _options_to_dict(options: Any) -> dict[str, Any]:
+    if options is None:
+        return {}
+    if isinstance(options, dict):
+        return dict(options)
+
+    to_map = getattr(options, "to_map", None)
+    if callable(to_map):
+        return dict(to_map())
+
+    data = getattr(options, "data", None)
+    if isinstance(data, dict):
+        return dict(data)
+
+    return {}
+
+
+def _extract_catalog_options(table: FileStoreTable) -> dict[str, Any]:
+    file_io = getattr(table, "file_io", None)
+    properties = getattr(file_io, "properties", None)
+    if properties is None:
+        properties = getattr(file_io, "catalog_options", None)
+    return _options_to_dict(properties)
+
+
+def _extract_identifier(table: FileStoreTable) -> _PaimonIdentifier | None:
+    identifier = getattr(table, "identifier", None)
+    if identifier is None:
+        return None
+
+    get_database_name = getattr(identifier, "get_database_name", None)
+    get_table_name = getattr(identifier, "get_table_name", None)
+    get_branch_name = getattr(identifier, "get_branch_name", None)
+
+    database_name = (
+        get_database_name()
+        if callable(get_database_name)
+        else getattr(identifier, "database", None)
+    )
+    table_name = (
+        get_table_name()
+        if callable(get_table_name)
+        else getattr(identifier, "object", None)
+    )
+    branch_name = (
+        get_branch_name()
+        if callable(get_branch_name)
+        else getattr(identifier, "branch", None)
+    )
+    if database_name is None or table_name is None:
+        return None
+    return database_name, table_name, branch_name
+
+
+def _to_paimon_identifier(identifier: _PaimonIdentifier) -> Any:
+    database_name, table_name, branch_name = identifier
+    if branch_name:
+        from pypaimon.common.identifier import Identifier
+
+        return Identifier(database_name, table_name, branch_name)
+    return f"{database_name}.{table_name}"
+
+
+def _load_table(
+    catalog_options: dict[str, Any],
+    table_identifier: _PaimonIdentifier | None,
+    table_path: str | None,
+) -> FileStoreTable:
+    if catalog_options and table_identifier is not None:
+        from pypaimon.catalog.catalog_factory import CatalogFactory
+
+        catalog = CatalogFactory.create(catalog_options)
+        return catalog.get_table(_to_paimon_identifier(table_identifier))
+
+    if table_path:
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        return FileStoreTable.from_path(table_path)
+
+    raise RuntimeError(
+        "Unable to reconstruct Paimon table while deserializing 
PaimonDataSink."
+    )
+
+
 class PaimonDataSink(DataSink[list[Any]]):
     """DataSink for writing data to an Apache Paimon table.
 
@@ -45,14 +132,51 @@ class PaimonDataSink(DataSink[list[Any]]):
     def __init__(self, table: FileStoreTable, mode: str = "append") -> None:
         if mode not in ("append", "overwrite"):
             raise ValueError(f"Only 'append' or 'overwrite' mode is supported, 
got: {mode!r}")
-        self._table = table
         self._mode = mode
+        self._catalog_options = _extract_catalog_options(table)
+        self._table_identifier = _extract_identifier(table)
+        table_path = getattr(table, "table_path", None)
+        self._table_path = str(table_path) if table_path is not None else None
+        self._commit_user: str | None = None
+        self._init_table(table)
+
+    def __getstate__(self) -> dict[str, Any]:
+        return {
+            "_mode": self._mode,
+            "_catalog_options": self._catalog_options,
+            "_table_identifier": self._table_identifier,
+            "_table_path": self._table_path,
+            "_commit_user": self._commit_user,
+        }
+
+    def __setstate__(self, state: dict[str, Any]) -> None:
+        self._mode = state["_mode"]
+        self._catalog_options = state["_catalog_options"]
+        self._table_identifier = state["_table_identifier"]
+        self._table_path = state["_table_path"]
+        self._commit_user = state["_commit_user"]
+        table = _load_table(
+            self._catalog_options,
+            self._table_identifier,
+            self._table_path,
+        )
+        self._init_table(table)
+
+    def _init_table(self, table: FileStoreTable) -> None:
+        self._table = table
 
         from pypaimon.schema.data_types import PyarrowFieldParser
 
         self._target_schema: pa.Schema = 
PyarrowFieldParser.from_paimon_schema(table.fields)
         self._write_builder = table.new_batch_write_builder()
-        if mode == "overwrite":
+        if (
+            self._commit_user is not None
+            and hasattr(self._write_builder, "commit_user")
+        ):
+            self._write_builder.commit_user = self._commit_user
+        else:
+            self._commit_user = getattr(self._write_builder, "commit_user", 
None)
+        if self._mode == "overwrite":
             self._write_builder.overwrite({})
 
     def name(self) -> str:
diff --git a/paimon-python/pypaimon/tests/daft/daft_sink_test.py 
b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
index 3cca5487d9..ad963de444 100644
--- a/paimon-python/pypaimon/tests/daft/daft_sink_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
@@ -303,6 +303,28 @@ def test_write_paimon_invalid_mode(append_only_table):
         _write_table(df, table, mode="upsert")
 
 
+def test_write_paimon_sink_serializes_without_file_io(append_only_table):
+    """PaimonDataSink should not pickle table FileIO objects."""
+    from daft.pickle import dumps, loads
+
+    class Unpicklable:
+        def __reduce__(self):
+            raise TypeError("file io marker should not be serialized")
+
+    table, _ = append_only_table
+    table.file_io._unpicklable_marker = Unpicklable()
+    sink = PaimonDataSink(table, mode="overwrite")
+    commit_user = sink._write_builder.commit_user
+
+    restored = loads(dumps(sink))
+
+    assert restored.name() == sink.name()
+    assert restored._mode == "overwrite"
+    assert restored._write_builder.commit_user == commit_user
+    assert restored._write_builder.static_partition == {}
+    assert restored._table.identifier.get_full_name() == 
table.identifier.get_full_name()
+
+
 def test_write_paimon_rejects_extra_columns(local_paimon_catalog):
     """Extra input columns should fail instead of being silently dropped."""
     catalog, _ = local_paimon_catalog

Reply via email to