This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ff3b281216 [python] [daft] Inject REST catalog OSS credentials into 
Daft IOConfig (#7959)
ff3b281216 is described below

commit ff3b281216189e9f73660df43548be60f44eb3a6
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue May 26 09:03:39 2026 +0800

    [python] [daft] Inject REST catalog OSS credentials into Daft IOConfig 
(#7959)
    
    Daft reads against an Apache Paimon REST catalog (DLF) currently 401 on
    OSS: `_convert_paimon_catalog_options_to_io_config` builds Daft's
    `IOConfig` from static `fs.oss.*` keys, but DLF delivers per-table STS
    tokens through `table.file_io` rather than `catalog_options`.
    
    `_read_table` now folds `table.file_io.token.token` into the options
    before building `IOConfig`, so Daft's native parquet reader gets the
    same credentials pypaimon uses. It also stops trimming `catalog_options`
    to `{warehouse: ...}` so the enrichment can read the
      `metastore` field.
---
 paimon-python/pypaimon/daft/daft_paimon.py         | 30 ++++++--
 .../pypaimon/tests/daft/daft_catalog_rest_test.py  | 82 +++++++++++++++++++++-
 2 files changed, 105 insertions(+), 7 deletions(-)

diff --git a/paimon-python/pypaimon/daft/daft_paimon.py 
b/paimon-python/pypaimon/daft/daft_paimon.py
index b14050f884..cde1e23d80 100644
--- a/paimon-python/pypaimon/daft/daft_paimon.py
+++ b/paimon-python/pypaimon/daft/daft_paimon.py
@@ -29,6 +29,7 @@ Usage::
 from __future__ import annotations
 
 from typing import TYPE_CHECKING, Dict, Optional
+from urllib.parse import urlparse
 
 if TYPE_CHECKING:
     import daft
@@ -36,6 +37,26 @@ if TYPE_CHECKING:
     from pypaimon.table.file_store_table import FileStoreTable
 
 
+def _enrich_options_with_rest_token(
+    catalog_options: Dict[str, str], table: "FileStoreTable"
+) -> Dict[str, str]:
+    # REST catalogs (DLF) keep OSS STS tokens on table.file_io and the bucket 
on table.table_path,
+    # not in catalog_options; fold both in so Daft's IOConfig routes to OSS 
with valid credentials.
+    if catalog_options.get("metastore") != "rest":
+        return catalog_options
+    file_io = getattr(table, "file_io", None)
+    if file_io is None or not hasattr(file_io, "try_to_refresh_token"):
+        return catalog_options
+    file_io.try_to_refresh_token()
+    if file_io.token is None:
+        return catalog_options
+    enriched = {**catalog_options, **file_io.token.token}
+    parsed = urlparse(getattr(table, "table_path", "") or "")
+    if parsed.scheme and parsed.netloc:
+        enriched["warehouse"] = f"{parsed.scheme}://{parsed.netloc}"
+    return enriched
+
+
 def _read_table(
     table: FileStoreTable,
     catalog_options: Dict[str, str] | None = None,
@@ -68,17 +89,16 @@ def _read_table(
     if catalog_options is None:
         catalog_options = {}
 
-    io_config = io_config or 
_convert_paimon_catalog_options_to_io_config(catalog_options)
+    io_config = io_config or _convert_paimon_catalog_options_to_io_config(
+        _enrich_options_with_rest_token(catalog_options, table)
+    )
     io_config = io_config or 
context.get_context().daft_planning_config.default_io_config
 
     multithreaded_io = runners.get_or_create_runner().name != "ray"
     storage_config = StorageConfig(multithreaded_io, io_config)
 
-    warehouse = catalog_options.get("warehouse", "")
-    scan_catalog_options = {"warehouse": warehouse} if warehouse else {}
-
     source = PaimonDataSource(
-        table, storage_config=storage_config, 
catalog_options=scan_catalog_options
+        table, storage_config=storage_config, catalog_options=catalog_options
     )
     return source.read()
 
diff --git a/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py 
b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
index 63ba577531..2936fded1c 100644
--- a/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
@@ -16,11 +16,11 @@
 # limitations under the License.
 
################################################################################
 
-"""Unit tests for PaimonCatalog REST catalog path (using mocks, no real server 
needed)."""
+"""Tests for the daft + REST catalog code path."""
 
 from __future__ import annotations
 
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
 
 import pytest
 
@@ -34,6 +34,7 @@ from pypaimon.catalog.catalog_exception import (
 )
 
 from pypaimon.daft.daft_catalog import PaimonCatalog
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
 
 # ---------------------------------------------------------------------------
 # Helpers: build a mock inner catalog that mimics RESTCatalog's interface
@@ -231,3 +232,80 @@ def test_create_namespace_single_part():
     cat.create_namespace("new_db")
 
     inner.create_database.assert_called_once_with("new_db", 
ignore_if_exists=False)
+
+
+class DaftRestReadTest(RESTBaseTest):
+
+    def test_read_table_forwards_full_catalog_options_to_datasource(self):
+        from pypaimon.daft.daft_datasource import PaimonDataSource
+        from pypaimon.daft.daft_paimon import _read_table
+
+        captured = {}
+        original_init = PaimonDataSource.__init__
+
+        def spy_init(_self, table, storage_config, catalog_options):
+            captured["catalog_options"] = dict(catalog_options)
+            return original_init(
+                _self, table,
+                storage_config=storage_config,
+                catalog_options=catalog_options,
+            )
+
+        with patch.object(PaimonDataSource, "__init__", spy_init):
+            _read_table(self.table, catalog_options=self.options)
+
+        received = captured["catalog_options"]
+        self.assertEqual(received.get("metastore"), "rest", received)
+        self.assertIn("uri", received, received)
+        self.assertIn("token", received, received)
+
+    def test_read_table_enriches_io_config_with_rest_token(self):
+        from pypaimon.daft import daft_io_config
+        from pypaimon.daft.daft_paimon import _read_table
+
+        token_payload = {
+            "fs.oss.accessKeyId": "ak-from-dlf",
+            "fs.oss.accessKeySecret": "sk-from-dlf",
+            "fs.oss.securityToken": "sts-from-dlf",
+        }
+        fake_token = MagicMock()
+        fake_token.token = token_payload
+        fake_file_io = MagicMock()
+        fake_file_io.token = fake_token
+
+        captured = {}
+        original_builder = 
daft_io_config._convert_paimon_catalog_options_to_io_config
+
+        def spy_builder(opts):
+            captured["opts"] = dict(opts)
+            return original_builder(opts)
+
+        oss_options = {**self.options, "warehouse": "morax_test"}
+        oss_table_path = "oss://my-bucket/db.db/tbl-abc"
+
+        with patch.object(self.table, "file_io", fake_file_io), \
+             patch.object(self.table, "table_path", oss_table_path), \
+             patch.object(daft_io_config, 
"_convert_paimon_catalog_options_to_io_config", spy_builder):
+            _read_table(self.table, catalog_options=oss_options)
+
+        for k, v in token_payload.items():
+            self.assertEqual(captured["opts"].get(k), v, captured["opts"])
+        self.assertEqual(captured["opts"].get("warehouse"), "oss://my-bucket", 
captured["opts"])
+        fake_file_io.try_to_refresh_token.assert_called()
+
+    def test_enrich_is_noop_when_not_rest_metastore(self):
+        from pypaimon.daft.daft_paimon import _enrich_options_with_rest_token
+        opts = {"warehouse": "/tmp/x", "metastore": "filesystem"}
+        self.assertIs(_enrich_options_with_rest_token(opts, self.table), opts)
+
+    def test_enrich_is_noop_when_file_io_has_no_refresh(self):
+        from pypaimon.daft.daft_paimon import _enrich_options_with_rest_token
+        with patch.object(self.table, "file_io", MagicMock(spec=[])):
+            self.assertIs(_enrich_options_with_rest_token(self.options, 
self.table), self.options)
+
+    def test_enrich_is_noop_when_token_is_none(self):
+        from pypaimon.daft.daft_paimon import _enrich_options_with_rest_token
+        fake_file_io = MagicMock()
+        fake_file_io.token = None
+        with patch.object(self.table, "file_io", fake_file_io):
+            self.assertIs(_enrich_options_with_rest_token(self.options, 
self.table), self.options)

Reply via email to