This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bfc6c8e03a1 Add catalog introspection to IcebergHook using pyiceberg 
(#62634)
bfc6c8e03a1 is described below

commit bfc6c8e03a18f5655dc3b3e35e0ae7411be7a00a
Author: Kaxil Naik <[email protected]>
AuthorDate: Sun Mar 1 02:32:33 2026 +0000

    Add catalog introspection to IcebergHook using pyiceberg (#62634)
    
    - Transform IcebergHook from a simple OAuth2 token fetcher into a full
      catalog client using pyiceberg's load_catalog.
    - Add introspection methods: list_namespaces, list_tables, load_table,
      table_exists, get_table_schema, get_partition_spec, get_table_properties,
      get_snapshots.
    - Preserve backward compatibility via get_token() and get_token_macro().
    - Only set credential field for REST catalogs; non-REST catalogs (Glue,
      BigQuery) use their own auth fields through extra JSON.
    - Validate table names contain a dot to catch unqualified names early.
    - Add pyiceberg>=0.8.0 as runtime dependency, bump to version 2.0.0.
    - Add pyiceberg to docs spelling wordlist.
---
 docs/spelling_wordlist.txt                         |   1 +
 providers/apache/iceberg/docs/connections.rst      |  61 ++-
 providers/apache/iceberg/docs/index.rst            |   7 +-
 providers/apache/iceberg/provider.yaml             |   1 +
 providers/apache/iceberg/pyproject.toml            |   9 +-
 .../airflow/providers/apache/iceberg/__init__.py   |   2 +-
 .../providers/apache/iceberg/hooks/iceberg.py      | 209 +++++++-
 .../tests/system/apache/iceberg/example_iceberg.py |   6 +-
 .../unit/apache/iceberg/hooks/test_iceberg.py      | 535 ++++++++++++++++++++-
 9 files changed, 770 insertions(+), 61 deletions(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 12f7e3ac427..50d5df9dea2 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1465,6 +1465,7 @@ pydruid
 pyexasol
 pyhive
 pyhs
+pyiceberg
 pylint
 pyMongo
 pymongo
diff --git a/providers/apache/iceberg/docs/connections.rst 
b/providers/apache/iceberg/docs/connections.rst
index 198296cfcb8..e48872fc01a 100644
--- a/providers/apache/iceberg/docs/connections.rst
+++ b/providers/apache/iceberg/docs/connections.rst
@@ -20,23 +20,66 @@
 Connecting to Iceberg
 =====================
 
-The Iceberg connection type enables connecting to an Iceberg REST catalog to 
request a short-lived token to access the Apache Iceberg tables. This token can 
be injected as an environment variable, to be used with Trino, Spark, Flink, or 
your favorite query engine that supports Apache Iceberg.
+The Iceberg connection type connects to an Iceberg REST catalog using 
``pyiceberg``.
+The hook provides catalog introspection (list namespaces, list tables, read 
schemas,
+inspect partitions and snapshots) and OAuth2 token generation for external 
engines
+like Spark, Trino, and Flink.
 
-After installing the Iceberg provider in your Airflow environment, the 
corresponding connection type of ``iceberg`` will be made available.
+After installing the Iceberg provider in your Airflow environment, the 
corresponding
+connection type of ``iceberg`` will be available.
 
 Default Connection IDs
 ----------------------
 
-Iceberg Hook uses the parameter ``iceberg_conn_id`` for Connection IDs and the 
value of the parameter as ``iceberg_default`` by default. You can create 
multiple connections in case you want to switch between environments.
+Iceberg Hook uses the parameter ``iceberg_conn_id`` for Connection IDs and the 
value
+of the parameter as ``iceberg_default`` by default. You can create multiple 
connections
+in case you want to switch between environments.
 
 Configuring the Connection
 --------------------------
 
-Client ID
-    The OAuth2 Client ID
+Catalog URI (Host)
+    The URL of the Iceberg REST catalog endpoint.
+    Example: ``https://your-catalog.example.com/ws/v1``
 
-Client Secret
-    The OAuth2 Client Secret
+Client ID (Login)
+    The OAuth2 Client ID for authenticating with the catalog.
+    Leave empty for catalogs that don't require OAuth2 credentials (e.g., 
local catalogs).
 
-Host
-    Sets the URL to the Tabular environment. By default 
`https://api.tabulardata.io/ws/v1`
+Client Secret (Password)
+    The OAuth2 Client Secret for authenticating with the catalog.
+
+Extra (Optional)
+    A JSON object with additional catalog properties passed to 
``pyiceberg.catalog.load_catalog()``.
+    Common properties:
+
+    .. code-block:: json
+
+        {
+            "warehouse": "s3://my-warehouse/",
+            "s3.endpoint": "https://s3.us-east-1.amazonaws.com";,
+            "s3.region": "us-east-1",
+            "s3.access-key-id": "AKIA...",
+            "s3.secret-access-key": "..."
+        }
+
+    For AWS/GCP/Azure deployments, prefer using IAM roles or environment-based
+    credentials and pass only the ``warehouse`` path in extra.
+
+Migration from 1.x
+-------------------
+
+In version 2.0.0, ``get_conn()`` now returns a ``pyiceberg.catalog.Catalog`` 
instance
+instead of a token string. If you were using ``get_conn()`` to obtain OAuth2 
tokens,
+switch to ``get_token()``:
+
+.. code-block:: python
+
+    # Before (1.x)
+    token = IcebergHook().get_conn()
+
+    # After (2.0)
+    token = IcebergHook().get_token()
+
+The ``get_token_macro()`` method has been updated to use ``get_token()`` 
automatically,
+so Jinja2 templates continue to work without changes.
diff --git a/providers/apache/iceberg/docs/index.rst 
b/providers/apache/iceberg/docs/index.rst
index 178b7c18b70..99318a31852 100644
--- a/providers/apache/iceberg/docs/index.rst
+++ b/providers/apache/iceberg/docs/index.rst
@@ -73,7 +73,7 @@ apache-airflow-providers-apache-iceberg package
 `Iceberg <https://iceberg.apache.org/>`__
 
 
-Release: 1.4.1
+Release: 2.0.0
 
 Provider package
 ----------------
@@ -98,6 +98,7 @@ PIP package                                 Version required
 ==========================================  ==================
 ``apache-airflow``                          ``>=2.11.0``
 ``apache-airflow-providers-common-compat``  ``>=1.8.0``
+``pyiceberg``                               ``>=0.8.0``
 ==========================================  ==================
 
 Cross provider package dependencies
@@ -125,5 +126,5 @@ Downloading official packages
 You can download officially released packages and verify their checksums and 
signatures from the
 `Official Apache Download site 
<https://downloads.apache.org/airflow/providers/>`_
 
-* `The apache-airflow-providers-apache-iceberg 1.4.1 sdist package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-1.4.1.tar.gz>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-1.4.1.tar.gz.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-1.4.1.tar.gz.sha512>`__)
-* `The apache-airflow-providers-apache-iceberg 1.4.1 wheel package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-1.4.1-py3-none-any.whl>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-1.4.1-py3-none-any.whl.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-1.4.1-py3-none-any.whl.sha512>`__)
+* `The apache-airflow-providers-apache-iceberg 2.0.0 sdist package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-2.0.0.tar.gz>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-2.0.0.tar.gz.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-2.0.0.tar.gz.sha512>`__)
+* `The apache-airflow-providers-apache-iceberg 2.0.0 wheel package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-2.0.0-py3-none-any.whl>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-2.0.0-py3-none-any.whl.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_apache_iceberg-2.0.0-py3-none-any.whl.sha512>`__)
diff --git a/providers/apache/iceberg/provider.yaml 
b/providers/apache/iceberg/provider.yaml
index b9932b7c910..c4da4f50483 100644
--- a/providers/apache/iceberg/provider.yaml
+++ b/providers/apache/iceberg/provider.yaml
@@ -29,6 +29,7 @@ source-date-epoch: 1768334254
 # In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider 
have
 # to be done in the same PR
 versions:
+  - 2.0.0
   - 1.4.1
   - 1.4.0
   - 1.3.4
diff --git a/providers/apache/iceberg/pyproject.toml 
b/providers/apache/iceberg/pyproject.toml
index f6366af59ef..9535cebdcbe 100644
--- a/providers/apache/iceberg/pyproject.toml
+++ b/providers/apache/iceberg/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
 
 [project]
 name = "apache-airflow-providers-apache-iceberg"
-version = "1.4.1"
+version = "2.0.0"
 description = "Provider package apache-airflow-providers-apache-iceberg for 
Apache Airflow"
 readme = "README.rst"
 license = "Apache-2.0"
@@ -60,6 +60,7 @@ requires-python = ">=3.10"
 dependencies = [
     "apache-airflow>=2.11.0",
     "apache-airflow-providers-common-compat>=1.8.0",
+    "pyiceberg>=0.8.0",
 ]
 
 [dependency-groups]
@@ -69,7 +70,7 @@ dev = [
     "apache-airflow-devel-common",
     "apache-airflow-providers-common-compat",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
-    "pyiceberg>=0.5.0",
+    "pyiceberg>=0.8.0",
 ]
 
 # To build docs:
@@ -98,8 +99,8 @@ apache-airflow-providers-common-sql = {workspace = true}
 apache-airflow-providers-standard = {workspace = true}
 
 [project.urls]
-"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-iceberg/1.4.1";
-"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-iceberg/1.4.1/changelog.html";
+"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-iceberg/2.0.0";
+"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-iceberg/2.0.0/changelog.html";
 "Bug Tracker" = "https://github.com/apache/airflow/issues";
 "Source Code" = "https://github.com/apache/airflow";
 "Slack Chat" = "https://s.apache.org/airflow-slack";
diff --git 
a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/__init__.py 
b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/__init__.py
index ecc6b5bb96c..8ee4b4cc6d5 100644
--- a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/__init__.py
+++ b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
 
 __all__ = ["__version__"]
 
-__version__ = "1.4.1"
+__version__ = "2.0.0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
     "2.11.0"
diff --git 
a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
 
b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
index 38a0dbcc6ac..bad7f3e44ee 100644
--- 
a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
+++ 
b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
@@ -16,25 +16,30 @@
 # under the License.
 from __future__ import annotations
 
-from typing import Any, cast
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
 
 import requests
-from requests import HTTPError
+from pyiceberg.catalog import load_catalog
 
 from airflow.providers.common.compat.sdk import BaseHook
 
+if TYPE_CHECKING:
+    from pyiceberg.catalog import Catalog
+    from pyiceberg.table import Table
+
 TOKENS_ENDPOINT = "oauth/tokens"
 
 
 class IcebergHook(BaseHook):
     """
-    This hook acts as a base hook for iceberg services.
+    Hook for Apache Iceberg REST catalogs.
 
-    It offers the ability to generate temporary, short-lived
-    session tokens to use within Airflow submitted jobs.
+    Provides catalog-level operations (list namespaces, list tables, load 
schemas)
+    using pyiceberg, plus OAuth2 token generation for external query engines.
 
     :param iceberg_conn_id: The :ref:`Iceberg connection 
id<howto/connection:iceberg>`
-        which refers to the information to connect to the Iceberg.
+        which refers to the information to connect to the Iceberg catalog.
     """
 
     conn_name_attr = "iceberg_conn_id"
@@ -48,13 +53,15 @@ class IcebergHook(BaseHook):
         return {
             "hidden_fields": ["schema", "port"],
             "relabeling": {
-                "host": "Base URL",
+                "host": "Catalog URI",
                 "login": "Client ID",
                 "password": "Client Secret",
             },
             "placeholders": {
-                "login": "client_id (token credentials auth)",
-                "password": "secret (token credentials auth)",
+                "host": "https://your-catalog.example.com/ws/v1";,
+                "login": "client_id (OAuth2 credentials)",
+                "password": "client_secret (OAuth2 credentials)",
+                "extra": '{"warehouse": "s3://my-warehouse/", "s3.region": 
"us-east-1"}',
             },
         }
 
@@ -62,29 +69,179 @@ class IcebergHook(BaseHook):
         super().__init__()
         self.conn_id = iceberg_conn_id
 
+    @cached_property
+    def catalog(self) -> Catalog:
+        """Return a pyiceberg Catalog instance for the configured 
connection."""
+        conn = self.get_connection(self.conn_id)
+
+        # Start with extra so connection fields take precedence
+        extra = conn.extra_dejson or {}
+        catalog_properties: dict[str, str] = {**extra}
+        catalog_properties["uri"] = conn.host.rstrip("/") if conn.host else ""
+        if "type" not in catalog_properties:
+            catalog_properties["type"] = "rest"
+
+        # credential is REST-catalog-specific; other catalogs (Glue, BigQuery)
+        # use their own auth fields passed through extra.
+        if catalog_properties["type"] == "rest":
+            if conn.login and conn.password:
+                catalog_properties["credential"] = 
f"{conn.login}:{conn.password}"
+            elif conn.login or conn.password:
+                self.log.warning(
+                    "Only one of Client ID / Client Secret is set. "
+                    "Both are required for OAuth2 credential authentication."
+                )
+
+        return load_catalog(self.conn_id, **catalog_properties)
+
+    def get_conn(self) -> Catalog:
+        """Return the pyiceberg Catalog."""
+        return self.catalog
+
     def test_connection(self) -> tuple[bool, str]:
-        """Test the Iceberg connection."""
+        """Test the Iceberg connection by listing namespaces."""
         try:
-            self.get_conn()
-            return True, "Successfully fetched token from Iceberg"
-        except HTTPError as e:
-            return False, f"HTTP Error: {e}: {e.response.text}"
+            namespaces = self.catalog.list_namespaces()
+            return True, f"Connected. Found {len(namespaces)} namespace(s)."
         except Exception as e:
             return False, str(e)
 
-    def get_conn(self) -> str:
-        """Obtain a short-lived access token via a client_id and 
client_secret."""
-        conn = self.get_connection(self.conn_id)
-        base_url = cast("str", conn.host)
-        base_url = base_url.rstrip("/")
-        client_id = conn.login
-        client_secret = conn.password
-        data = {"client_id": client_id, "client_secret": client_secret, 
"grant_type": "client_credentials"}
+    # ---- Token methods (backward compatibility) ----
 
-        response = requests.post(f"{base_url}/{TOKENS_ENDPOINT}", data=data)
-        response.raise_for_status()
+    def get_token(self) -> str:
+        """
+        Obtain a short-lived OAuth2 access token.
 
+        This preserves the legacy behavior of the pre-2.0 ``get_conn()`` 
method.
+        Use this when you need a raw token for external engines (Spark, Trino, 
Flink).
+        """
+        conn = self.get_connection(self.conn_id)
+        base_url = conn.host.rstrip("/") if conn.host else ""
+        data = {
+            "client_id": conn.login,
+            "client_secret": conn.password,
+            "grant_type": "client_credentials",
+        }
+        response = requests.post(f"{base_url}/{TOKENS_ENDPOINT}", data=data, 
timeout=30)
+        response.raise_for_status()
         return response.json()["access_token"]
 
-    def get_token_macro(self):
-        return f"{{{{ conn.{self.conn_id}.get_hook().get_conn() }}}}"
+    def get_token_macro(self) -> str:
+        """Return a Jinja2 macro that resolves to a fresh token at render 
time."""
+        return f"{{{{ conn.{self.conn_id}.get_hook().get_token() }}}}"
+
+    # ---- Namespace operations ----
+
+    def list_namespaces(self) -> list[str]:
+        """Return all namespace names in the catalog."""
+        return [".".join(ns) for ns in self.catalog.list_namespaces()]
+
+    # ---- Table operations ----
+
+    def list_tables(self, namespace: str) -> list[str]:
+        """
+        Return all table names in the given namespace.
+
+        :param namespace: Namespace (database/schema) to list tables from.
+        :return: List of fully-qualified table names ("namespace.table").
+        """
+        return [".".join(ident) for ident in 
self.catalog.list_tables(namespace)]
+
+    def load_table(self, table_name: str) -> Table:
+        """
+        Load an Iceberg table object.
+
+        :param table_name: Fully-qualified table name ("namespace.table").
+        :return: pyiceberg Table instance.
+        """
+        if "." not in table_name:
+            raise ValueError(f"Expected fully-qualified table name 
(namespace.table), got: {table_name!r}")
+        return self.catalog.load_table(table_name)
+
+    def table_exists(self, table_name: str) -> bool:
+        """Check whether a table exists in the catalog."""
+        return self.catalog.table_exists(table_name)
+
+    # ---- Schema introspection ----
+
+    def get_table_schema(self, table_name: str, **kwargs: Any) -> 
list[dict[str, str]]:
+        """
+        Return column names and types for an Iceberg table.
+
+        Compatible with the ``DbApiHook.get_table_schema()`` contract so that
+        LLM operators can use this hook interchangeably for schema context.
+
+        :param table_name: Fully-qualified table name ("namespace.table").
+        :return: List of dicts with ``name`` and ``type`` keys.
+
+        Example return value::
+
+            [
+                {"name": "id", "type": "long"},
+                {"name": "name", "type": "string"},
+                {"name": "created_at", "type": "timestamptz"},
+            ]
+        """
+        table = self.load_table(table_name)
+        return [
+            {
+                "name": field.name,
+                "type": str(field.field_type),
+            }
+            for field in table.schema().fields
+        ]
+
+    def get_partition_spec(self, table_name: str) -> list[dict[str, str]]:
+        """
+        Return the partition spec for an Iceberg table.
+
+        :param table_name: Fully-qualified table name.
+        :return: List of dicts with ``field`` and ``transform`` keys.
+
+        Example::
+
+            [
+                {"field": "event_date", "transform": "day"},
+                {"field": "region", "transform": "identity"},
+            ]
+        """
+        table = self.load_table(table_name)
+        spec = table.spec()
+        schema = table.schema()
+        result = []
+        for partition_field in spec.fields:
+            source_field = schema.find_field(partition_field.source_id)
+            result.append(
+                {
+                    "field": source_field.name,
+                    "transform": str(partition_field.transform),
+                }
+            )
+        return result
+
+    def get_table_properties(self, table_name: str) -> dict[str, str]:
+        """
+        Return table properties (format version, write config, etc.).
+
+        :param table_name: Fully-qualified table name.
+        """
+        table = self.load_table(table_name)
+        return dict(table.properties)
+
+    def get_snapshots(self, table_name: str, limit: int = 10) -> 
list[dict[str, Any]]:
+        """
+        Return recent snapshots for an Iceberg table.
+
+        :param table_name: Fully-qualified table name.
+        :param limit: Maximum number of snapshots to return (most recent 
first).
+        :return: List of dicts with snapshot metadata.
+        """
+        table = self.load_table(table_name)
+        arrow_table = table.inspect.snapshots()
+        num_rows = len(arrow_table)
+        if num_rows <= limit:
+            rows = arrow_table.to_pylist()
+        else:
+            rows = arrow_table.slice(offset=num_rows - limit, 
length=limit).to_pylist()
+        rows.reverse()
+        return rows
diff --git 
a/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py 
b/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py
index a13e0164262..e87fe74f956 100644
--- a/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py
+++ b/providers/apache/iceberg/tests/system/apache/iceberg/example_iceberg.py
@@ -22,8 +22,10 @@ from airflow import DAG
 from airflow.providers.apache.iceberg.hooks.iceberg import IcebergHook
 from airflow.providers.standard.operators.bash import BashOperator
 
+iceberg_hook = IcebergHook()
+
 bash_command = f"""
-echo "Our token: {IcebergHook().get_token_macro()}"
+echo "Our token: {iceberg_hook.get_token_macro()}"
 echo "Also as an environment variable:"
 env | grep TOKEN
 """
@@ -42,7 +44,7 @@ with DAG(
     BashOperator(
         task_id="with_iceberg_environment_variable",
         bash_command=bash_command,
-        env={"TOKEN": IcebergHook().get_token_macro()},
+        env={"TOKEN": iceberg_hook.get_token_macro()},
     )
 
 
diff --git 
a/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py 
b/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
index 7021aa5a776..1dbe4036476 100644
--- a/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
+++ b/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
@@ -14,35 +14,538 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 from __future__ import annotations
 
+from unittest.mock import MagicMock, patch
+
+import pytest
 import requests_mock
 
 from airflow.models import Connection
 from airflow.providers.apache.iceberg.hooks.iceberg import IcebergHook
 
+LOAD_CATALOG = "airflow.providers.apache.iceberg.hooks.iceberg.load_catalog"
 
-def test_iceberg_hook(create_connection_without_db):
-    access_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSU"
+
[email protected]
+def iceberg_connection(create_connection_without_db):
+    """Create a standard Iceberg connection for tests."""
     create_connection_without_db(
         Connection(
             conn_id="iceberg_default",
             conn_type="iceberg",
             host="https://api.iceberg.io/ws/v1";,
-            extra='{"region": "us-west-2", "catalog_url": 
"warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d"}',
+            login="my_client_id",
+            password="my_client_secret",
+            extra='{"warehouse": "s3://my-warehouse/"}',
+        )
+    )
+
+
[email protected]
+def iceberg_connection_no_creds(create_connection_without_db):
+    """Create an Iceberg connection without OAuth2 credentials."""
+    create_connection_without_db(
+        Connection(
+            conn_id="iceberg_default",
+            conn_type="iceberg",
+            host="https://local-catalog.example.com";,
         )
     )
-    with requests_mock.Mocker() as m:
-        m.post(
-            "https://api.iceberg.io/ws/v1/oauth/tokens";,
-            json={
-                "access_token": access_token,
-                "token_type": "Bearer",
-                "expires_in": 86400,
-                "warehouse_id": "fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
-                "region": "us-west-2",
-                "catalog_url": 
"warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
-            },
+
+
+def _make_mock_catalog():
+    """Create a mock catalog with spec matching the real Catalog interface."""
+    from pyiceberg.catalog.rest import RestCatalog
+
+    return MagicMock(spec=RestCatalog)
+
+
+def _make_mock_table():
+    """Create a mock table with spec matching the real Table interface."""
+    from pyiceberg.table import Table
+
+    return MagicMock(spec=Table)
+
+
+def _make_hook_with_mock_catalog(mock_catalog):
+    """Create a hook with a pre-populated cached_property for catalog."""
+    hook = IcebergHook()
+    # Bypass cached_property by writing directly to instance __dict__
+    hook.__dict__["catalog"] = mock_catalog
+    return hook
+
+
+class TestIcebergHookCatalogConfig:
+    """Test catalog configuration from Airflow connection."""
+
+    def test_catalog_from_connection_fields(self, iceberg_connection):
+        """Catalog is configured from host, login, password."""
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            catalog = hook.get_conn()
+
+            mock_load.assert_called_once_with(
+                "iceberg_default",
+                warehouse="s3://my-warehouse/",
+                uri="https://api.iceberg.io/ws/v1";,
+                type="rest",
+                credential="my_client_id:my_client_secret",
+            )
+            assert catalog is mock_load.return_value
+
+    def test_catalog_merges_extra_properties(self, 
create_connection_without_db):
+        """Extra JSON properties (warehouse, S3 config) are merged into 
catalog config."""
+        create_connection_without_db(
+            Connection(
+                conn_id="iceberg_default",
+                conn_type="iceberg",
+                host="https://api.iceberg.io/ws/v1";,
+                login="client",
+                password="secret",
+                extra='{"warehouse": "s3://bucket/", "s3.region": 
"us-east-1"}',
+            )
+        )
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            hook.get_conn()
+
+            call_kwargs = mock_load.call_args[1]
+            assert call_kwargs["warehouse"] == "s3://bucket/"
+            assert call_kwargs["s3.region"] == "us-east-1"
+
+    def test_catalog_without_credentials(self, iceberg_connection_no_creds):
+        """Catalog works without OAuth2 credentials (e.g., local/unsigned 
catalogs)."""
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            hook.get_conn()
+
+            call_kwargs = mock_load.call_args[1]
+            assert "credential" not in call_kwargs
+            assert call_kwargs["uri"] == "https://local-catalog.example.com";
+
+    def test_extra_cannot_override_uri(self, create_connection_without_db):
+        """Connection host always wins over uri in extra."""
+        create_connection_without_db(
+            Connection(
+                conn_id="iceberg_default",
+                conn_type="iceberg",
+                host="https://correct-host.example.com";,
+                extra='{"uri": "https://wrong-host.example.com"}',
+            )
+        )
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            hook.get_conn()
+
+            call_kwargs = mock_load.call_args[1]
+            assert call_kwargs["uri"] == "https://correct-host.example.com";
+
+    def test_extra_can_override_catalog_type(self, 
create_connection_without_db):
+        """Extra can set catalog type to non-REST (e.g., glue)."""
+        create_connection_without_db(
+            Connection(
+                conn_id="iceberg_default",
+                conn_type="iceberg",
+                host="https://glue.example.com";,
+                extra='{"type": "glue"}',
+            )
         )
-        assert IcebergHook().get_conn() == access_token
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            hook.get_conn()
+
+            call_kwargs = mock_load.call_args[1]
+            assert call_kwargs["type"] == "glue"
+
+    def test_non_rest_catalog_skips_credential(self, 
create_connection_without_db):
+        """Non-REST catalogs (glue, bigquery) don't get the credential 
field."""
+        create_connection_without_db(
+            Connection(
+                conn_id="iceberg_default",
+                conn_type="iceberg",
+                host="https://glue.example.com";,
+                login="some_user",
+                password="some_pass",
+                extra='{"type": "glue"}',
+            )
+        )
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            hook.get_conn()
+
+            call_kwargs = mock_load.call_args[1]
+            assert "credential" not in call_kwargs
+
+    def test_partial_credentials_warns(self, create_connection_without_db, 
caplog):
+        """Only login or only password logs a warning."""
+        create_connection_without_db(
+            Connection(
+                conn_id="iceberg_default",
+                conn_type="iceberg",
+                host="https://api.iceberg.io/ws/v1";,
+                login="client_id_only",
+            )
+        )
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            hook.get_conn()
+
+            call_kwargs = mock_load.call_args[1]
+            assert "credential" not in call_kwargs
+        assert "Both are required" in caplog.text
+
+
+class TestIcebergHookCatalog:
+    """Test catalog introspection methods."""
+
+    def test_list_namespaces(self):
+        """list_namespaces returns dotted namespace strings."""
+        mock_cat = _make_mock_catalog()
+        mock_cat.list_namespaces.return_value = [("default",), ("analytics", 
"raw")]
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.list_namespaces()
+
+        assert result == ["default", "analytics.raw"]
+
+    def test_list_tables(self):
+        """list_tables returns fully-qualified table names."""
+        mock_cat = _make_mock_catalog()
+        mock_cat.list_tables.return_value = [
+            ("analytics", "events"),
+            ("analytics", "users"),
+        ]
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.list_tables("analytics")
+
+        assert result == ["analytics.events", "analytics.users"]
+        mock_cat.list_tables.assert_called_once_with("analytics")
+
+    def test_table_exists_true(self):
+        """table_exists returns True for existing table."""
+        mock_cat = _make_mock_catalog()
+        mock_cat.table_exists.return_value = True
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        assert hook.table_exists("analytics.events") is True
+
+    def test_table_exists_false(self):
+        """table_exists returns False for missing table."""
+        mock_cat = _make_mock_catalog()
+        mock_cat.table_exists.return_value = False
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        assert hook.table_exists("analytics.missing") is False
+
+    def test_load_table(self):
+        """load_table delegates to catalog.load_table."""
+        mock_cat = _make_mock_catalog()
+        mock_table = _make_mock_table()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.load_table("db.my_table")
+
+        assert result is mock_table
+        mock_cat.load_table.assert_called_once_with("db.my_table")
+
+    def test_load_table_rejects_unqualified_name(self):
+        """load_table raises ValueError for names without a namespace."""
+        hook = _make_hook_with_mock_catalog(_make_mock_catalog())
+
+        with pytest.raises(ValueError, match="namespace.table"):
+            hook.load_table("bare_table")
+
+    def test_get_table_schema(self):
+        """get_table_schema returns list of {name, type} dicts matching 
DbApiHook contract."""
+        mock_field1 = MagicMock()
+        mock_field1.name = "id"
+        mock_field1.field_type = MagicMock(__str__=lambda s: "long")
+
+        mock_field2 = MagicMock()
+        mock_field2.name = "name"
+        mock_field2.field_type = MagicMock(__str__=lambda s: "string")
+
+        mock_field3 = MagicMock()
+        mock_field3.name = "created_at"
+        mock_field3.field_type = MagicMock(__str__=lambda s: "timestamptz")
+
+        mock_schema = MagicMock()
+        mock_schema.fields = [mock_field1, mock_field2, mock_field3]
+
+        mock_table = _make_mock_table()
+        mock_table.schema.return_value = mock_schema
+
+        mock_cat = _make_mock_catalog()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.get_table_schema("db.my_table")
+
+        assert result == [
+            {"name": "id", "type": "long"},
+            {"name": "name", "type": "string"},
+            {"name": "created_at", "type": "timestamptz"},
+        ]
+
+    def test_get_table_schema_nested_types(self):
+        """Nested types (struct, list, map) are stringified correctly."""
+        mock_field1 = MagicMock()
+        mock_field1.name = "tags"
+        mock_field1.field_type = MagicMock(__str__=lambda s: "list<string>")
+
+        mock_field2 = MagicMock()
+        mock_field2.name = "metadata"
+        mock_field2.field_type = MagicMock(__str__=lambda s: "map<string, 
string>")
+
+        mock_field3 = MagicMock()
+        mock_field3.name = "address"
+        mock_field3.field_type = MagicMock(__str__=lambda s: "struct<city: 
string, zip: string>")
+
+        mock_schema = MagicMock()
+        mock_schema.fields = [mock_field1, mock_field2, mock_field3]
+
+        mock_table = _make_mock_table()
+        mock_table.schema.return_value = mock_schema
+
+        mock_cat = _make_mock_catalog()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.get_table_schema("db.nested_table")
+
+        assert result == [
+            {"name": "tags", "type": "list<string>"},
+            {"name": "metadata", "type": "map<string, string>"},
+            {"name": "address", "type": "struct<city: string, zip: string>"},
+        ]
+
+    def test_get_partition_spec(self):
+        """get_partition_spec returns field names and transforms."""
+        mock_source_field1 = MagicMock()
+        mock_source_field1.name = "event_date"
+        mock_source_field2 = MagicMock()
+        mock_source_field2.name = "region"
+
+        mock_partition_field1 = MagicMock()
+        mock_partition_field1.source_id = 1
+        mock_partition_field1.transform = MagicMock(__str__=lambda s: "day")
+        mock_partition_field2 = MagicMock()
+        mock_partition_field2.source_id = 2
+        mock_partition_field2.transform = MagicMock(__str__=lambda s: 
"identity")
+
+        mock_spec = MagicMock()
+        mock_spec.fields = [mock_partition_field1, mock_partition_field2]
+
+        mock_schema = MagicMock()
+        mock_schema.find_field.side_effect = lambda sid: {
+            1: mock_source_field1,
+            2: mock_source_field2,
+        }[sid]
+
+        mock_table = _make_mock_table()
+        mock_table.spec.return_value = mock_spec
+        mock_table.schema.return_value = mock_schema
+
+        mock_cat = _make_mock_catalog()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.get_partition_spec("db.partitioned_table")
+
+        assert result == [
+            {"field": "event_date", "transform": "day"},
+            {"field": "region", "transform": "identity"},
+        ]
+
+    def test_get_partition_spec_no_partitions(self):
+        """Unpartitioned table returns empty list."""
+        mock_spec = MagicMock()
+        mock_spec.fields = []
+
+        mock_table = _make_mock_table()
+        mock_table.spec.return_value = mock_spec
+        mock_table.schema.return_value = MagicMock()
+
+        mock_cat = _make_mock_catalog()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.get_partition_spec("db.unpartitioned_table")
+
+        assert result == []
+
+    def test_get_table_properties(self):
+        """get_table_properties returns dict of string key-value pairs."""
+        mock_table = _make_mock_table()
+        mock_table.properties = {
+            "format-version": "2",
+            "write.format.default": "parquet",
+        }
+
+        mock_cat = _make_mock_catalog()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.get_table_properties("db.my_table")
+
+        assert result == {
+            "format-version": "2",
+            "write.format.default": "parquet",
+        }
+
+    def test_get_snapshots_returns_most_recent_first(self):
+        """get_snapshots returns snapshots in reverse chronological order."""
+        mock_arrow = MagicMock()
+        mock_arrow.__len__ = lambda s: 3
+        mock_arrow.to_pylist.return_value = [
+            {"snapshot_id": 1, "committed_at": "2024-01-01"},
+            {"snapshot_id": 2, "committed_at": "2024-06-01"},
+            {"snapshot_id": 3, "committed_at": "2024-12-01"},
+        ]
+
+        mock_table = _make_mock_table()
+        mock_table.inspect.snapshots.return_value = mock_arrow
+
+        mock_cat = _make_mock_catalog()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.get_snapshots("db.my_table")
+
+        assert len(result) == 3
+        # Most recent first
+        assert result[0]["snapshot_id"] == 3
+        assert result[2]["snapshot_id"] == 1
+
+    def test_get_snapshots_respects_limit(self):
+        """Limit parameter caps the number of returned snapshots, taking the 
most recent."""
+        mock_arrow = MagicMock()
+        mock_arrow.__len__ = lambda s: 20
+        # When sliced to last 5 items (offset=15, length=5), return those
+        mock_arrow.slice.return_value = MagicMock()
+        mock_arrow.slice.return_value.to_pylist.return_value = 
[{"snapshot_id": i} for i in range(15, 20)]
+
+        mock_table = _make_mock_table()
+        mock_table.inspect.snapshots.return_value = mock_arrow
+
+        mock_cat = _make_mock_catalog()
+        mock_cat.load_table.return_value = mock_table
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        result = hook.get_snapshots("db.my_table", limit=5)
+
+        assert len(result) == 5
+        # Most recent first (reversed from chronological)
+        assert result[0]["snapshot_id"] == 19
+        assert result[4]["snapshot_id"] == 15
+        mock_arrow.slice.assert_called_once_with(offset=15, length=5)
+
+
+class TestPyicebergAPICompatibility:
+    """Verify the pyiceberg APIs that IcebergHook depends on still exist.
+
+    These tests use real pyiceberg classes (no mocks) so they fail if
+    pyiceberg removes or renames an API we rely on. This catches breaking
+    changes from upstream that mocked unit tests would miss.
+    """
+
+    def test_catalog_has_required_methods(self):
+        """Catalog ABC exposes the methods IcebergHook calls."""
+        from pyiceberg.catalog import Catalog
+
+        for method in ("list_namespaces", "list_tables", "load_table", 
"table_exists"):
+            assert hasattr(Catalog, method), f"pyiceberg Catalog missing 
{method}"
+
+    def test_load_catalog_importable(self):
+        """load_catalog is importable from pyiceberg.catalog."""
+        from pyiceberg.catalog import load_catalog
+
+        assert callable(load_catalog)
+
+    def test_table_has_required_attributes(self):
+        """Table exposes schema, spec, properties, and inspect."""
+        from pyiceberg.table import Table
+
+        for attr in ("schema", "spec", "properties", "inspect"):
+            assert hasattr(Table, attr), f"pyiceberg Table missing {attr}"
+
+    def test_inspect_table_has_snapshots(self):
+        """InspectTable has a snapshots method."""
+        from pyiceberg.table.inspect import InspectTable
+
+        assert hasattr(InspectTable, "snapshots")
+
+    def test_schema_has_fields_and_find_field(self):
+        """Schema instance exposes fields and find_field."""
+        from pyiceberg.schema import Schema
+        from pyiceberg.types import LongType, NestedField
+
+        schema = Schema(NestedField(1, "id", LongType(), required=True))
+        assert hasattr(schema, "fields")
+        assert hasattr(schema, "find_field")
+
+    def test_partition_spec_has_fields(self):
+        """PartitionSpec instance exposes fields."""
+        from pyiceberg.partitioning import PartitionSpec
+
+        spec = PartitionSpec()
+        assert hasattr(spec, "fields")
+
+
+class TestIcebergHookToken:
+    """Test backward-compatible token methods."""
+
+    def test_get_token(self, iceberg_connection):
+        """get_token fetches OAuth2 access token."""
+        access_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSU"
+        hook = IcebergHook()
+        with requests_mock.Mocker() as m:
+            m.post(
+                "https://api.iceberg.io/ws/v1/oauth/tokens";,
+                json={
+                    "access_token": access_token,
+                    "token_type": "Bearer",
+                    "expires_in": 86400,
+                },
+            )
+            result = hook.get_token()
+
+        assert result == access_token
+
+    def test_get_token_macro(self):
+        """get_token_macro returns correct Jinja2 template string."""
+        hook = IcebergHook()
+        result = hook.get_token_macro()
+        assert result == "{{ conn.iceberg_default.get_hook().get_token() }}"
+
+    def test_test_connection_success(self):
+        """test_connection returns True when catalog is reachable."""
+        mock_cat = _make_mock_catalog()
+        mock_cat.list_namespaces.return_value = [("default",), ("analytics",)]
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        success, message = hook.test_connection()
+
+        assert success is True
+        assert "2 namespace(s)" in message
+
+    def test_test_connection_failure(self):
+        """test_connection returns False with error message on failure."""
+        mock_cat = _make_mock_catalog()
+        mock_cat.list_namespaces.side_effect = ConnectionError("Connection 
refused")
+        hook = _make_hook_with_mock_catalog(mock_cat)
+
+        success, message = hook.test_connection()
+
+        assert success is False
+        assert "Connection refused" in message


Reply via email to