kaxil commented on code in PR #62634:
URL: https://github.com/apache/airflow/pull/62634#discussion_r2868097386


##########
providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py:
##########
@@ -48,43 +53,190 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
         return {
             "hidden_fields": ["schema", "port"],
             "relabeling": {
-                "host": "Base URL",
+                "host": "Catalog URI",
                 "login": "Client ID",
                 "password": "Client Secret",
             },
             "placeholders": {
-                "login": "client_id (token credentials auth)",
-                "password": "secret (token credentials auth)",
+                "host": "https://your-catalog.example.com/ws/v1";,
+                "login": "client_id (OAuth2 credentials)",
+                "password": "client_secret (OAuth2 credentials)",
+                "extra": '{"warehouse": "s3://my-warehouse/", "s3.region": 
"us-east-1"}',
             },
         }
 
     def __init__(self, iceberg_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.conn_id = iceberg_conn_id
 
+    @cached_property
+    def catalog(self) -> Catalog:
+        """Return a pyiceberg Catalog instance for the configured 
connection."""
+        conn = self.get_connection(self.conn_id)
+
+        # Start with extra so connection fields take precedence
+        extra = conn.extra_dejson or {}
+        catalog_properties: dict[str, str] = {**extra}
+        catalog_properties["uri"] = conn.host.rstrip("/") if conn.host else ""
+        if "type" not in catalog_properties:
+            catalog_properties["type"] = "rest"
+
+        if conn.login and conn.password:
+            catalog_properties["credential"] = f"{conn.login}:{conn.password}"

Review Comment:
   Good catch\! The `credential` field is REST-catalog-specific. Updated to 
only set it when `type == "rest"`. Non-REST catalogs (Glue, BigQuery) pass 
their auth through `extra` JSON. Added a test for this.



##########
providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py:
##########
@@ -48,43 +53,190 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
         return {
             "hidden_fields": ["schema", "port"],
             "relabeling": {
-                "host": "Base URL",
+                "host": "Catalog URI",
                 "login": "Client ID",
                 "password": "Client Secret",
             },
             "placeholders": {
-                "login": "client_id (token credentials auth)",
-                "password": "secret (token credentials auth)",
+                "host": "https://your-catalog.example.com/ws/v1";,
+                "login": "client_id (OAuth2 credentials)",
+                "password": "client_secret (OAuth2 credentials)",
+                "extra": '{"warehouse": "s3://my-warehouse/", "s3.region": 
"us-east-1"}',
             },
         }
 
     def __init__(self, iceberg_conn_id: str = default_conn_name) -> None:
         super().__init__()
         self.conn_id = iceberg_conn_id
 
+    @cached_property
+    def catalog(self) -> Catalog:
+        """Return a pyiceberg Catalog instance for the configured 
connection."""
+        conn = self.get_connection(self.conn_id)
+
+        # Start with extra so connection fields take precedence
+        extra = conn.extra_dejson or {}
+        catalog_properties: dict[str, str] = {**extra}
+        catalog_properties["uri"] = conn.host.rstrip("/") if conn.host else ""
+        if "type" not in catalog_properties:
+            catalog_properties["type"] = "rest"
+
+        if conn.login and conn.password:
+            catalog_properties["credential"] = f"{conn.login}:{conn.password}"
+        elif conn.login or conn.password:
+            self.log.warning(
+                "Only one of Client ID / Client Secret is set. "
+                "Both are required for OAuth2 credential authentication."
+            )
+
+        return load_catalog(self.conn_id, **catalog_properties)
+
+    def get_conn(self) -> Catalog:
+        """Return the pyiceberg Catalog."""
+        return self.catalog
+
     def test_connection(self) -> tuple[bool, str]:
-        """Test the Iceberg connection."""
+        """Test the Iceberg connection by listing namespaces."""
         try:
-            self.get_conn()
-            return True, "Successfully fetched token from Iceberg"
-        except HTTPError as e:
-            return False, f"HTTP Error: {e}: {e.response.text}"
+            namespaces = self.catalog.list_namespaces()
+            return True, f"Connected. Found {len(namespaces)} namespace(s)."
         except Exception as e:
             return False, str(e)
 
-    def get_conn(self) -> str:
-        """Obtain a short-lived access token via a client_id and 
client_secret."""
-        conn = self.get_connection(self.conn_id)
-        base_url = cast("str", conn.host)
-        base_url = base_url.rstrip("/")
-        client_id = conn.login
-        client_secret = conn.password
-        data = {"client_id": client_id, "client_secret": client_secret, 
"grant_type": "client_credentials"}
+    # ---- Token methods (backward compatibility) ----
 
-        response = requests.post(f"{base_url}/{TOKENS_ENDPOINT}", data=data)
-        response.raise_for_status()
+    def get_token(self) -> str:
+        """
+        Obtain a short-lived OAuth2 access token.
 
+        This preserves the legacy behavior of the pre-2.0 ``get_conn()`` 
method.
+        Use this when you need a raw token for external engines (Spark, Trino, 
Flink).
+        """
+        conn = self.get_connection(self.conn_id)
+        base_url = conn.host.rstrip("/") if conn.host else ""
+        data = {
+            "client_id": conn.login,
+            "client_secret": conn.password,
+            "grant_type": "client_credentials",
+        }
+        response = requests.post(f"{base_url}/{TOKENS_ENDPOINT}", data=data, 
timeout=30)
+        response.raise_for_status()
         return response.json()["access_token"]
 
-    def get_token_macro(self):
-        return f"{{{{ conn.{self.conn_id}.get_hook().get_conn() }}}}"
+    def get_token_macro(self) -> str:
+        """Return a Jinja2 macro that resolves to a fresh token at render 
time."""
+        return f"{{{{ conn.{self.conn_id}.get_hook().get_token() }}}}"
+
+    # ---- Namespace operations ----
+
+    def list_namespaces(self) -> list[str]:
+        """Return all namespace names in the catalog."""
+        return [".".join(ns) for ns in self.catalog.list_namespaces()]
+
+    # ---- Table operations ----
+
+    def list_tables(self, namespace: str) -> list[str]:
+        """
+        Return all table names in the given namespace.
+
+        :param namespace: Namespace (database/schema) to list tables from.
+        :return: List of fully-qualified table names ("namespace.table").
+        """
+        return [".".join(ident) for ident in 
self.catalog.list_tables(namespace)]
+
+    def load_table(self, table_name: str) -> Table:

Review Comment:
   Added validation — `load_table()` now raises `ValueError` with a clear 
message if `table_name` does not contain a dot. Added a test for this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to