This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ff3b281216 [python] [daft] Inject REST catalog OSS credentials into
Daft IOConfig (#7959)
ff3b281216 is described below
commit ff3b281216189e9f73660df43548be60f44eb3a6
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue May 26 09:03:39 2026 +0800
[python] [daft] Inject REST catalog OSS credentials into Daft IOConfig
(#7959)
Daft reads against an Apache Paimon REST catalog (DLF) currently 401 on
OSS: `_convert_paimon_catalog_options_to_io_config` builds Daft's
`IOConfig` from static `fs.oss.*` keys, but DLF delivers per-table STS
tokens through `table.file_io` rather than `catalog_options`.
`_read_table` now folds `table.file_io.token.token` into the options
before building `IOConfig`, so Daft's native parquet reader gets the
same credentials pypaimon uses. It also stops trimming `catalog_options`
to `{warehouse: ...}` so the enrichment can read the
`metastore` field.
---
paimon-python/pypaimon/daft/daft_paimon.py | 30 ++++++--
.../pypaimon/tests/daft/daft_catalog_rest_test.py | 82 +++++++++++++++++++++-
2 files changed, 105 insertions(+), 7 deletions(-)
diff --git a/paimon-python/pypaimon/daft/daft_paimon.py
b/paimon-python/pypaimon/daft/daft_paimon.py
index b14050f884..cde1e23d80 100644
--- a/paimon-python/pypaimon/daft/daft_paimon.py
+++ b/paimon-python/pypaimon/daft/daft_paimon.py
@@ -29,6 +29,7 @@ Usage::
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, Optional
+from urllib.parse import urlparse
if TYPE_CHECKING:
import daft
@@ -36,6 +37,26 @@ if TYPE_CHECKING:
from pypaimon.table.file_store_table import FileStoreTable
+def _enrich_options_with_rest_token(
+ catalog_options: Dict[str, str], table: "FileStoreTable"
+) -> Dict[str, str]:
+ # REST catalogs (DLF) keep OSS STS tokens on table.file_io and the bucket
on table.table_path,
+ # not in catalog_options; fold both in so Daft's IOConfig routes to OSS
with valid credentials.
+ if catalog_options.get("metastore") != "rest":
+ return catalog_options
+ file_io = getattr(table, "file_io", None)
+ if file_io is None or not hasattr(file_io, "try_to_refresh_token"):
+ return catalog_options
+ file_io.try_to_refresh_token()
+ if file_io.token is None:
+ return catalog_options
+ enriched = {**catalog_options, **file_io.token.token}
+ parsed = urlparse(getattr(table, "table_path", "") or "")
+ if parsed.scheme and parsed.netloc:
+ enriched["warehouse"] = f"{parsed.scheme}://{parsed.netloc}"
+ return enriched
+
+
def _read_table(
table: FileStoreTable,
catalog_options: Dict[str, str] | None = None,
@@ -68,17 +89,16 @@ def _read_table(
if catalog_options is None:
catalog_options = {}
- io_config = io_config or
_convert_paimon_catalog_options_to_io_config(catalog_options)
+ io_config = io_config or _convert_paimon_catalog_options_to_io_config(
+ _enrich_options_with_rest_token(catalog_options, table)
+ )
io_config = io_config or
context.get_context().daft_planning_config.default_io_config
multithreaded_io = runners.get_or_create_runner().name != "ray"
storage_config = StorageConfig(multithreaded_io, io_config)
- warehouse = catalog_options.get("warehouse", "")
- scan_catalog_options = {"warehouse": warehouse} if warehouse else {}
-
source = PaimonDataSource(
- table, storage_config=storage_config,
catalog_options=scan_catalog_options
+ table, storage_config=storage_config, catalog_options=catalog_options
)
return source.read()
diff --git a/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
index 63ba577531..2936fded1c 100644
--- a/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_catalog_rest_test.py
@@ -16,11 +16,11 @@
# limitations under the License.
################################################################################
-"""Unit tests for PaimonCatalog REST catalog path (using mocks, no real server
needed)."""
+"""Tests for the daft + REST catalog code path."""
from __future__ import annotations
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
import pytest
@@ -34,6 +34,7 @@ from pypaimon.catalog.catalog_exception import (
)
from pypaimon.daft.daft_catalog import PaimonCatalog
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
# ---------------------------------------------------------------------------
# Helpers: build a mock inner catalog that mimics RESTCatalog's interface
@@ -231,3 +232,80 @@ def test_create_namespace_single_part():
cat.create_namespace("new_db")
inner.create_database.assert_called_once_with("new_db",
ignore_if_exists=False)
+
+
+class DaftRestReadTest(RESTBaseTest):
+
+ def test_read_table_forwards_full_catalog_options_to_datasource(self):
+ from pypaimon.daft.daft_datasource import PaimonDataSource
+ from pypaimon.daft.daft_paimon import _read_table
+
+ captured = {}
+ original_init = PaimonDataSource.__init__
+
+ def spy_init(_self, table, storage_config, catalog_options):
+ captured["catalog_options"] = dict(catalog_options)
+ return original_init(
+ _self, table,
+ storage_config=storage_config,
+ catalog_options=catalog_options,
+ )
+
+ with patch.object(PaimonDataSource, "__init__", spy_init):
+ _read_table(self.table, catalog_options=self.options)
+
+ received = captured["catalog_options"]
+ self.assertEqual(received.get("metastore"), "rest", received)
+ self.assertIn("uri", received, received)
+ self.assertIn("token", received, received)
+
+ def test_read_table_enriches_io_config_with_rest_token(self):
+ from pypaimon.daft import daft_io_config
+ from pypaimon.daft.daft_paimon import _read_table
+
+ token_payload = {
+ "fs.oss.accessKeyId": "ak-from-dlf",
+ "fs.oss.accessKeySecret": "sk-from-dlf",
+ "fs.oss.securityToken": "sts-from-dlf",
+ }
+ fake_token = MagicMock()
+ fake_token.token = token_payload
+ fake_file_io = MagicMock()
+ fake_file_io.token = fake_token
+
+ captured = {}
+ original_builder =
daft_io_config._convert_paimon_catalog_options_to_io_config
+
+ def spy_builder(opts):
+ captured["opts"] = dict(opts)
+ return original_builder(opts)
+
+ oss_options = {**self.options, "warehouse": "morax_test"}
+ oss_table_path = "oss://my-bucket/db.db/tbl-abc"
+
+ with patch.object(self.table, "file_io", fake_file_io), \
+ patch.object(self.table, "table_path", oss_table_path), \
+ patch.object(daft_io_config,
"_convert_paimon_catalog_options_to_io_config", spy_builder):
+ _read_table(self.table, catalog_options=oss_options)
+
+ for k, v in token_payload.items():
+ self.assertEqual(captured["opts"].get(k), v, captured["opts"])
+ self.assertEqual(captured["opts"].get("warehouse"), "oss://my-bucket",
captured["opts"])
+ fake_file_io.try_to_refresh_token.assert_called()
+
+ def test_enrich_is_noop_when_not_rest_metastore(self):
+ from pypaimon.daft.daft_paimon import _enrich_options_with_rest_token
+ opts = {"warehouse": "/tmp/x", "metastore": "filesystem"}
+ self.assertIs(_enrich_options_with_rest_token(opts, self.table), opts)
+
+ def test_enrich_is_noop_when_file_io_has_no_refresh(self):
+ from pypaimon.daft.daft_paimon import _enrich_options_with_rest_token
+ with patch.object(self.table, "file_io", MagicMock(spec=[])):
+ self.assertIs(_enrich_options_with_rest_token(self.options,
self.table), self.options)
+
+ def test_enrich_is_noop_when_token_is_none(self):
+ from pypaimon.daft.daft_paimon import _enrich_options_with_rest_token
+ fake_file_io = MagicMock()
+ fake_file_io.token = None
+ with patch.object(self.table, "file_io", fake_file_io):
+ self.assertIs(_enrich_options_with_rest_token(self.options,
self.table), self.options)