This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new a8033c1e feat: Allow servers to express supported endpoints with 
ConfigResponse (#2848)
a8033c1e is described below

commit a8033c1e09cda2eaf62a753c6a0a9d4b59ebb718
Author: geruh <[email protected]>
AuthorDate: Mon Dec 29 14:44:09 2025 -0800

    feat: Allow servers to express supported endpoints with ConfigResponse 
(#2848)
    
    closes to #2847
    
    # Rationale for this change
    
    This PR adds the server endpoint capabilities support, aligning with the
    Java
    
[implementation](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java).
    While working on the REST scanning support, we need to know if a server
    supports specific capabilities before making any calls. So this PR also
    adds some extra support for the current implementation of PI iceberg
    REST catalog.
    
    The REST catalog will now parse the endpoints field from the config call
    to determine server capabilities. When a server doesn't respond, we have
    fallback logic that matches the behavior of Java's rest catalog. The
    View endpoints are conditionally added to the default with the config
    property as well.
    
    ## Are these changes tested?
    
    Added unit tests and tested with the iceberg rest fixture.
    
    ## Are there any user-facing changes?
    Yes added config and alignment with java impl.
    
    cc: @kevinjqliu @Fokko
---
 pyiceberg/catalog/rest/__init__.py | 163 ++++++++++++++++++++++++++++++++++++-
 tests/catalog/test_rest.py         | 142 ++++++++++++++++++++++++++++----
 2 files changed, 287 insertions(+), 18 deletions(-)

diff --git a/pyiceberg/catalog/rest/__init__.py 
b/pyiceberg/catalog/rest/__init__.py
index a28ff562..7866f5e8 100644
--- a/pyiceberg/catalog/rest/__init__.py
+++ b/pyiceberg/catalog/rest/__init__.py
@@ -21,7 +21,7 @@ from typing import (
     Union,
 )
 
-from pydantic import Field, field_validator
+from pydantic import ConfigDict, Field, field_validator
 from requests import HTTPError, Session
 from tenacity import RetryCallState, retry, retry_if_exception_type, 
stop_after_attempt
 
@@ -76,6 +76,39 @@ if TYPE_CHECKING:
     import pyarrow as pa
 
 
+class HttpMethod(str, Enum):
+    GET = "GET"
+    HEAD = "HEAD"
+    POST = "POST"
+    DELETE = "DELETE"
+
+
+class Endpoint(IcebergBaseModel):
+    model_config = ConfigDict(frozen=True)
+
+    http_method: HttpMethod = Field()
+    path: str = Field()
+
+    @field_validator("path", mode="before")
+    @classmethod
+    def _validate_path(cls, raw_path: str) -> str:
+        raw_path = raw_path.strip()
+        if not raw_path:
+            raise ValueError("Invalid path: empty")
+        return raw_path
+
+    def __str__(self) -> str:
+        """Return the string representation of the Endpoint class."""
+        return f"{self.http_method.value} {self.path}"
+
+    @classmethod
+    def from_string(cls, endpoint: str) -> "Endpoint":
+        elements = endpoint.strip().split(None, 1)
+        if len(elements) != 2:
+            raise ValueError(f"Invalid endpoint (must consist of two elements 
separated by a single space): {endpoint}")
+        return cls(http_method=HttpMethod(elements[0].upper()), 
path=elements[1])
+
+
 class Endpoints:
     get_config: str = "config"
     list_namespaces: str = "namespaces"
@@ -86,7 +119,7 @@ class Endpoints:
     namespace_exists: str = "namespaces/{namespace}"
     list_tables: str = "namespaces/{namespace}/tables"
     create_table: str = "namespaces/{namespace}/tables"
-    register_table = "namespaces/{namespace}/register"
+    register_table: str = "namespaces/{namespace}/register"
     load_table: str = "namespaces/{namespace}/tables/{table}"
     update_table: str = "namespaces/{namespace}/tables/{table}"
     drop_table: str = "namespaces/{namespace}/tables/{table}"
@@ -100,6 +133,61 @@ class Endpoints:
     fetch_scan_tasks: str = "namespaces/{namespace}/tables/{table}/tasks"
 
 
+API_PREFIX = "/v1/{prefix}"
+
+
+class Capability:
+    V1_LIST_NAMESPACES = Endpoint(http_method=HttpMethod.GET, 
path=f"{API_PREFIX}/{Endpoints.list_namespaces}")
+    V1_LOAD_NAMESPACE = Endpoint(http_method=HttpMethod.GET, 
path=f"{API_PREFIX}/{Endpoints.load_namespace_metadata}")
+    V1_NAMESPACE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, 
path=f"{API_PREFIX}/{Endpoints.namespace_exists}")
+    V1_UPDATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.update_namespace_properties}")
+    V1_CREATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.create_namespace}")
+    V1_DELETE_NAMESPACE = Endpoint(http_method=HttpMethod.DELETE, 
path=f"{API_PREFIX}/{Endpoints.drop_namespace}")
+
+    V1_LIST_TABLES = Endpoint(http_method=HttpMethod.GET, 
path=f"{API_PREFIX}/{Endpoints.list_tables}")
+    V1_LOAD_TABLE = Endpoint(http_method=HttpMethod.GET, 
path=f"{API_PREFIX}/{Endpoints.load_table}")
+    V1_TABLE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, 
path=f"{API_PREFIX}/{Endpoints.table_exists}")
+    V1_CREATE_TABLE = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.create_table}")
+    V1_UPDATE_TABLE = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.update_table}")
+    V1_DELETE_TABLE = Endpoint(http_method=HttpMethod.DELETE, 
path=f"{API_PREFIX}/{Endpoints.drop_table}")
+    V1_RENAME_TABLE = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.rename_table}")
+    V1_REGISTER_TABLE = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.register_table}")
+
+    V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, 
path=f"{API_PREFIX}/{Endpoints.list_views}")
+    V1_VIEW_EXISTS = Endpoint(http_method=HttpMethod.HEAD, 
path=f"{API_PREFIX}/{Endpoints.view_exists}")
+    V1_DELETE_VIEW = Endpoint(http_method=HttpMethod.DELETE, 
path=f"{API_PREFIX}/{Endpoints.drop_view}")
+    V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.plan_table_scan}")
+    V1_TABLE_SCAN_PLAN_TASKS = Endpoint(http_method=HttpMethod.POST, 
path=f"{API_PREFIX}/{Endpoints.fetch_scan_tasks}")
+
+
+# Default endpoints for backwards compatibility with legacy servers that don't 
return endpoints
+# in ConfigResponse. Only includes namespace and table endpoints.
+DEFAULT_ENDPOINTS: frozenset[Endpoint] = frozenset(
+    (
+        Capability.V1_LIST_NAMESPACES,
+        Capability.V1_LOAD_NAMESPACE,
+        Capability.V1_CREATE_NAMESPACE,
+        Capability.V1_UPDATE_NAMESPACE,
+        Capability.V1_DELETE_NAMESPACE,
+        Capability.V1_LIST_TABLES,
+        Capability.V1_LOAD_TABLE,
+        Capability.V1_CREATE_TABLE,
+        Capability.V1_UPDATE_TABLE,
+        Capability.V1_DELETE_TABLE,
+        Capability.V1_RENAME_TABLE,
+        Capability.V1_REGISTER_TABLE,
+    )
+)
+
+# View endpoints conditionally added based on VIEW_ENDPOINTS_SUPPORTED 
property.
+VIEW_ENDPOINTS: frozenset[Endpoint] = frozenset(
+    (
+        Capability.V1_LIST_VIEWS,
+        Capability.V1_DELETE_VIEW,
+    )
+)
+
+
 class IdentifierKind(Enum):
     TABLE = "table"
     VIEW = "view"
@@ -134,6 +222,10 @@ AUTH = "auth"
 CUSTOM = "custom"
 REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"
 REST_SCAN_PLANNING_ENABLED_DEFAULT = False
+# for backwards compatibility with older REST servers where it can be assumed 
that a particular
+# server supports view endpoints but doesn't send the "endpoints" field in the 
ConfigResponse
+VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported"
+VIEW_ENDPOINTS_SUPPORTED_DEFAULT = False
 
 NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)
 
@@ -180,6 +272,14 @@ class RegisterTableRequest(IcebergBaseModel):
 class ConfigResponse(IcebergBaseModel):
     defaults: Properties | None = Field(default_factory=dict)
     overrides: Properties | None = Field(default_factory=dict)
+    endpoints: set[Endpoint] | None = Field(default=None)
+
+    @field_validator("endpoints", mode="before")
+    @classmethod
+    def _parse_endpoints(cls, v: list[str] | None) -> set[Endpoint] | None:
+        if v is None:
+            return None
+        return {Endpoint.from_string(s) for s in v}
 
 
 class ListNamespaceResponse(IcebergBaseModel):
@@ -218,6 +318,7 @@ class ListViewsResponse(IcebergBaseModel):
 class RestCatalog(Catalog):
     uri: str
     _session: Session
+    _supported_endpoints: set[Endpoint]
 
     def __init__(self, name: str, **properties: str):
         """Rest Catalog.
@@ -279,7 +380,9 @@ class RestCatalog(Catalog):
         Returns:
             True if enabled, False otherwise.
         """
-        return property_as_bool(self.properties, REST_SCAN_PLANNING_ENABLED, 
REST_SCAN_PLANNING_ENABLED_DEFAULT)
+        return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in 
self._supported_endpoints and property_as_bool(
+            self.properties, REST_SCAN_PLANNING_ENABLED, 
REST_SCAN_PLANNING_ENABLED_DEFAULT
+        )
 
     def _create_legacy_oauth2_auth_manager(self, session: Session) -> 
AuthManager:
         """Create the LegacyOAuth2AuthManager by fetching required properties.
@@ -327,6 +430,18 @@ class RestCatalog(Catalog):
 
         return url + endpoint.format(**kwargs)
 
+    def _check_endpoint(self, endpoint: Endpoint) -> None:
+        """Check if an endpoint is supported by the server.
+
+        Args:
+            endpoint: The endpoint to check against the set of supported 
endpoints
+
+        Raises:
+            NotImplementedError: If the endpoint is not supported.
+        """
+        if endpoint not in self._supported_endpoints:
+            raise NotImplementedError(f"Server does not support endpoint: 
{endpoint}")
+
     @property
     def auth_url(self) -> str:
         self._warn_oauth_tokens_deprecation()
@@ -384,6 +499,17 @@ class RestCatalog(Catalog):
         # Update URI based on overrides
         self.uri = config[URI]
 
+        # Determine supported endpoints
+        endpoints = config_response.endpoints
+        if endpoints:
+            self._supported_endpoints = set(endpoints)
+        else:
+            # Use default endpoints for legacy servers that don't return 
endpoints
+            self._supported_endpoints = set(DEFAULT_ENDPOINTS)
+            # Conditionally add view endpoints based on config
+            if property_as_bool(self.properties, VIEW_ENDPOINTS_SUPPORTED, 
VIEW_ENDPOINTS_SUPPORTED_DEFAULT):
+                self._supported_endpoints.update(VIEW_ENDPOINTS)
+
     def _identifier_to_validated_tuple(self, identifier: str | Identifier) -> 
Identifier:
         identifier_tuple = self.identifier_to_tuple(identifier)
         if len(identifier_tuple) <= 1:
@@ -503,6 +629,7 @@ class RestCatalog(Catalog):
         properties: Properties = EMPTY_DICT,
         stage_create: bool = False,
     ) -> TableResponse:
+        self._check_endpoint(Capability.V1_CREATE_TABLE)
         iceberg_schema = self._convert_schema_if_needed(
             schema,
             int(properties.get(TableProperties.FORMAT_VERSION, 
TableProperties.DEFAULT_FORMAT_VERSION)),  # type: ignore
@@ -591,6 +718,7 @@ class RestCatalog(Catalog):
         Raises:
             TableAlreadyExistsError: If the table already exists
         """
+        self._check_endpoint(Capability.V1_REGISTER_TABLE)
         namespace_and_table = self._split_identifier_for_path(identifier)
         request = RegisterTableRequest(
             name=namespace_and_table["table"],
@@ -611,6 +739,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
+        self._check_endpoint(Capability.V1_LIST_TABLES)
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
         response = self._session.get(self.url(Endpoints.list_tables, 
namespace=namespace_concat))
@@ -622,6 +751,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def load_table(self, identifier: str | Identifier) -> Table:
+        self._check_endpoint(Capability.V1_LOAD_TABLE)
         params = {}
         if mode := self.properties.get(SNAPSHOT_LOADING_MODE):
             if mode in {"all", "refs"}:
@@ -642,6 +772,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def drop_table(self, identifier: str | Identifier, purge_requested: bool = 
False) -> None:
+        self._check_endpoint(Capability.V1_DELETE_TABLE)
         response = self._session.delete(
             self.url(Endpoints.drop_table, prefixed=True, 
**self._split_identifier_for_path(identifier)),
             params={"purgeRequested": purge_requested},
@@ -657,6 +788,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def rename_table(self, from_identifier: str | Identifier, to_identifier: 
str | Identifier) -> Table:
+        self._check_endpoint(Capability.V1_RENAME_TABLE)
         payload = {
             "source": self._split_identifier_for_json(from_identifier),
             "destination": self._split_identifier_for_json(to_identifier),
@@ -692,6 +824,8 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def list_views(self, namespace: str | Identifier) -> list[Identifier]:
+        if Capability.V1_LIST_VIEWS not in self._supported_endpoints:
+            return []
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
         response = self._session.get(self.url(Endpoints.list_views, 
namespace=namespace_concat))
@@ -720,6 +854,7 @@ class RestCatalog(Catalog):
             CommitFailedException: Requirement not met, or a conflict with a 
concurrent commit.
             CommitStateUnknownException: Failed due to an internal exception 
on the side of the catalog.
         """
+        self._check_endpoint(Capability.V1_UPDATE_TABLE)
         identifier = table.name()
         table_identifier = TableIdentifier(namespace=identifier[:-1], 
name=identifier[-1])
         table_request = CommitTableRequest(identifier=table_identifier, 
requirements=requirements, updates=updates)
@@ -749,6 +884,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def create_namespace(self, namespace: str | Identifier, properties: 
Properties = EMPTY_DICT) -> None:
+        self._check_endpoint(Capability.V1_CREATE_NAMESPACE)
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         payload = {"namespace": namespace_tuple, "properties": properties}
         response = self._session.post(self.url(Endpoints.create_namespace), 
json=payload)
@@ -759,6 +895,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def drop_namespace(self, namespace: str | Identifier) -> None:
+        self._check_endpoint(Capability.V1_DELETE_NAMESPACE)
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
         response = self._session.delete(self.url(Endpoints.drop_namespace, 
namespace=namespace))
@@ -769,6 +906,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def list_namespaces(self, namespace: str | Identifier = ()) -> 
list[Identifier]:
+        self._check_endpoint(Capability.V1_LIST_NAMESPACES)
         namespace_tuple = self.identifier_to_tuple(namespace)
         response = self._session.get(
             self.url(
@@ -786,6 +924,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def load_namespace_properties(self, namespace: str | Identifier) -> 
Properties:
+        self._check_endpoint(Capability.V1_LOAD_NAMESPACE)
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
         response = 
self._session.get(self.url(Endpoints.load_namespace_metadata, 
namespace=namespace))
@@ -800,6 +939,7 @@ class RestCatalog(Catalog):
     def update_namespace_properties(
         self, namespace: str | Identifier, removals: set[str] | None = None, 
updates: Properties = EMPTY_DICT
     ) -> PropertiesUpdateSummary:
+        self._check_endpoint(Capability.V1_UPDATE_NAMESPACE)
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
         payload = {"removals": list(removals or []), "updates": updates}
@@ -819,6 +959,14 @@ class RestCatalog(Catalog):
     def namespace_exists(self, namespace: str | Identifier) -> bool:
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
+        # fallback in order to work with older rest catalog implementations
+        if Capability.V1_NAMESPACE_EXISTS not in self._supported_endpoints:
+            try:
+                self.load_namespace_properties(namespace_tuple)
+                return True
+            except NoSuchNamespaceError:
+                return False
+
         response = self._session.head(self.url(Endpoints.namespace_exists, 
namespace=namespace))
 
         if response.status_code == 404:
@@ -843,6 +991,14 @@ class RestCatalog(Catalog):
         Returns:
             bool: True if the table exists, False otherwise.
         """
+        # fallback in order to work with older rest catalog implementations
+        if Capability.V1_TABLE_EXISTS not in self._supported_endpoints:
+            try:
+                self.load_table(identifier)
+                return True
+            except NoSuchTableError:
+                return False
+
         response = self._session.head(
             self.url(Endpoints.load_table, prefixed=True, 
**self._split_identifier_for_path(identifier))
         )
@@ -886,6 +1042,7 @@ class RestCatalog(Catalog):
 
     @retry(**_RETRY_ARGS)
     def drop_view(self, identifier: str) -> None:
+        self._check_endpoint(Capability.V1_DELETE_VIEW)
         response = self._session.delete(
             self.url(Endpoints.drop_view, prefixed=True, 
**self._split_identifier_for_path(identifier, IdentifierKind.VIEW)),
         )
diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py
index 464314f3..5aae5949 100644
--- a/tests/catalog/test_rest.py
+++ b/tests/catalog/test_rest.py
@@ -27,7 +27,7 @@ from requests_mock import Mocker
 
 import pyiceberg
 from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog
-from pyiceberg.catalog.rest import OAUTH2_SERVER_URI, SNAPSHOT_LOADING_MODE, 
RestCatalog
+from pyiceberg.catalog.rest import DEFAULT_ENDPOINTS, OAUTH2_SERVER_URI, 
SNAPSHOT_LOADING_MODE, Capability, RestCatalog
 from pyiceberg.exceptions import (
     AuthorizationExpiredError,
     NamespaceAlreadyExistsError,
@@ -68,6 +68,28 @@ OAUTH_TEST_HEADERS = {
     "Content-type": "application/x-www-form-urlencoded",
 }
 
+TEST_SUPPORTED_ENDPOINTS = [
+    Capability.V1_LIST_NAMESPACES,
+    Capability.V1_LOAD_NAMESPACE,
+    Capability.V1_NAMESPACE_EXISTS,
+    Capability.V1_UPDATE_NAMESPACE,
+    Capability.V1_CREATE_NAMESPACE,
+    Capability.V1_DELETE_NAMESPACE,
+    Capability.V1_LIST_TABLES,
+    Capability.V1_LOAD_TABLE,
+    Capability.V1_TABLE_EXISTS,
+    Capability.V1_CREATE_TABLE,
+    Capability.V1_UPDATE_TABLE,
+    Capability.V1_DELETE_TABLE,
+    Capability.V1_RENAME_TABLE,
+    Capability.V1_REGISTER_TABLE,
+    Capability.V1_LIST_VIEWS,
+    Capability.V1_VIEW_EXISTS,
+    Capability.V1_DELETE_VIEW,
+    Capability.V1_SUBMIT_TABLE_SCAN_PLAN,
+    Capability.V1_TABLE_SCAN_PLAN_TASKS,
+]
+
 
 @pytest.fixture
 def 
example_table_metadata_with_snapshot_v1_rest_json(example_table_metadata_with_snapshot_v1:
 dict[str, Any]) -> dict[str, Any]:
@@ -112,7 +134,7 @@ def rest_mock(requests_mock: Mocker) -> Mocker:
     """
     requests_mock.get(
         f"{TEST_URI}v1/config",
-        json={"defaults": {}, "overrides": {}},
+        json={"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for 
endpoint in TEST_SUPPORTED_ENDPOINTS]},
         status_code=200,
     )
     return requests_mock
@@ -1995,21 +2017,11 @@ class TestRestCatalogClose:
         assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS_SIGV4
 
     def test_rest_scan_planning_disabled_by_default(self, rest_mock: Mocker) 
-> None:
-        rest_mock.get(
-            f"{TEST_URI}v1/config",
-            json={"defaults": {}, "overrides": {}},
-            status_code=200,
-        )
         catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
 
         assert catalog.is_rest_scan_planning_enabled() is False
 
     def test_rest_scan_planning_enabled_by_property(self, rest_mock: Mocker) 
-> None:
-        rest_mock.get(
-            f"{TEST_URI}v1/config",
-            json={"defaults": {}, "overrides": {}},
-            status_code=200,
-        )
         catalog = RestCatalog(
             "rest",
             uri=TEST_URI,
@@ -2019,12 +2031,23 @@ class TestRestCatalogClose:
 
         assert catalog.is_rest_scan_planning_enabled() is True
 
-    def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) 
-> None:
-        rest_mock.get(
+    def test_rest_scan_planning_disabled_when_endpoint_unsupported(self, 
requests_mock: Mocker) -> None:
+        # config endpoint does not populate endpoint falling back to default
+        requests_mock.get(
             f"{TEST_URI}v1/config",
             json={"defaults": {}, "overrides": {}},
             status_code=200,
         )
+        catalog = RestCatalog(
+            "rest",
+            uri=TEST_URI,
+            token=TEST_TOKEN,
+            **{"rest-scan-planning-enabled": "true"},
+        )
+
+        assert catalog.is_rest_scan_planning_enabled() is False
+
+    def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) 
-> None:
         catalog = RestCatalog(
             "rest",
             uri=TEST_URI,
@@ -2037,9 +2060,98 @@ class TestRestCatalogClose:
     def test_rest_scan_planning_enabled_from_server_config(self, rest_mock: 
Mocker) -> None:
         rest_mock.get(
             f"{TEST_URI}v1/config",
-            json={"defaults": {"rest-scan-planning-enabled": "true"}, 
"overrides": {}},
+            json={
+                "defaults": {"rest-scan-planning-enabled": "true"},
+                "overrides": {},
+                "endpoints": ["POST 
/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"],
+            },
             status_code=200,
         )
         catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
 
         assert catalog.is_rest_scan_planning_enabled() is True
+
+    def test_supported_endpoint(self, requests_mock: Mocker) -> None:
+        requests_mock.get(
+            f"{TEST_URI}v1/config",
+            json={
+                "defaults": {},
+                "overrides": {},
+                "endpoints": ["GET /v1/{prefix}/namespaces", "GET 
/v1/{prefix}/namespaces/{namespace}/tables"],
+            },
+            status_code=200,
+        )
+        catalog = RestCatalog("rest", uri=TEST_URI, token="token")
+
+        # Should not raise since these endpoints are in the supported set
+        catalog._check_endpoint(Capability.V1_LIST_NAMESPACES)
+        catalog._check_endpoint(Capability.V1_LIST_TABLES)
+
+    def test_unsupported_endpoint(self, requests_mock: Mocker) -> None:
+        requests_mock.get(
+            f"{TEST_URI}v1/config",
+            json={
+                "defaults": {},
+                "overrides": {},
+                "endpoints": ["GET /v1/{prefix}/namespaces"],
+            },
+            status_code=200,
+        )
+        catalog = RestCatalog("rest", uri=TEST_URI, token="token")
+
+        with pytest.raises(NotImplementedError, match="Server does not support 
endpoint"):
+            catalog._check_endpoint(Capability.V1_LIST_TABLES)
+
+    def test_config_returns_invalid_endpoint(self, requests_mock: Mocker) -> 
None:
+        requests_mock.get(
+            f"{TEST_URI}v1/config",
+            json={
+                "defaults": {},
+                "overrides": {},
+                "endpoints": ["INVALID_ENDPOINT"],
+            },
+            status_code=200,
+        )
+
+        with pytest.raises(ValueError, match="Invalid endpoint"):
+            RestCatalog("rest", uri=TEST_URI, token="token")
+
+    def test_default_endpoints_used_when_none_returned(self, requests_mock: 
Mocker) -> None:
+        requests_mock.get(
+            f"{TEST_URI}v1/config",
+            json={"defaults": {}, "overrides": {}},
+            status_code=200,
+        )
+        catalog = RestCatalog("rest", uri=TEST_URI, token="token")
+
+        # Should not raise for default endpoints
+        for endpoint in DEFAULT_ENDPOINTS:
+            catalog._check_endpoint(endpoint)
+
+    def test_view_endpoints_not_included_by_default(self, requests_mock: 
Mocker) -> None:
+        requests_mock.get(
+            f"{TEST_URI}v1/config",
+            json={"defaults": {}, "overrides": {}},
+            status_code=200,
+        )
+        catalog = RestCatalog("rest", uri=TEST_URI, token="token")
+
+        with pytest.raises(NotImplementedError, match="Server does not support 
endpoint"):
+            catalog._check_endpoint(Capability.V1_LIST_VIEWS)
+
+    def test_view_endpoints_enabled_with_config(self, requests_mock: Mocker) 
-> None:
+        requests_mock.get(
+            f"{TEST_URI}v1/config",
+            json={"defaults": {}, "overrides": {}},
+            status_code=200,
+        )
+        catalog = RestCatalog(
+            "rest",
+            uri=TEST_URI,
+            token="token",
+            **{"view-endpoints-supported": "true"},
+        )
+
+        # View endpoints should be supported when enabled
+        catalog._check_endpoint(Capability.V1_LIST_VIEWS)
+        catalog._check_endpoint(Capability.V1_DELETE_VIEW)

Reply via email to