This is an automated email from the ASF dual-hosted git repository.

gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 483d84c6c95 Add iceberg support to  AnalyticsOperator (#62754)
483d84c6c95 is described below

commit 483d84c6c95536b8679520fdfdaf30e21320850d
Author: GPK <[email protected]>
AuthorDate: Tue Mar 3 09:56:28 2026 +0000

    Add iceberg support to  AnalyticsOperator (#62754)
    
    * Add iceberg support to analytics operator
    
    * Add iceberg support to analytics operator
    
    * Resolve comments
    
    * Fixup tests
---
 dev/breeze/tests/test_selective_checks.py          |   2 +-
 docs/spelling_wordlist.txt                         |   1 +
 .../providers/apache/iceberg/hooks/iceberg.py      |   5 +-
 .../unit/apache/iceberg/hooks/test_iceberg.py      |  18 +++
 providers/common/sql/docs/index.rst                |  15 +-
 providers/common/sql/docs/operators.rst            |  10 ++
 providers/common/sql/pyproject.toml                |  15 +-
 .../sql/src/airflow/providers/common/sql/config.py |  43 +++--
 .../providers/common/sql/datafusion/base.py        |  13 +-
 .../providers/common/sql/datafusion/engine.py      |  19 +--
 .../providers/common/sql/datafusion/exceptions.py  |   4 +
 .../common/sql/datafusion/format_handlers.py       | 123 ++++++++++-----
 .../common/sql/example_dags/example_analytics.py   |  34 +++-
 .../common/sql/datafusion/test_format_handlers.py  | 173 ++++++++++++++++-----
 .../sql/tests/unit/common/sql/test_config.py       |  23 ++-
 15 files changed, 381 insertions(+), 117 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 18e972c6ba1..5ec37351cad 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2248,7 +2248,7 @@ def test_upgrade_to_newer_dependencies(
         pytest.param(
             
("providers/common/sql/src/airflow/providers/common/sql/common_sql_python.py",),
             {
-                "docs-list-as-string": "amazon apache.drill apache.druid 
apache.hive "
+                "docs-list-as-string": "amazon apache.drill apache.druid 
apache.hive apache.iceberg "
                 "apache.impala apache.pinot common.ai common.compat common.sql 
databricks elasticsearch "
                 "exasol google jdbc microsoft.mssql mysql odbc openlineage "
                 "oracle pgvector postgres presto slack snowflake sqlite 
teradata trino vertica ydb",
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 50d5df9dea2..fbfbeda2b00 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1765,6 +1765,7 @@ stderr
 stdout
 stmts
 StorageClass
+storages
 StoredInfoType
 storedInfoType
 str
diff --git 
a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
 
b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
index bad7f3e44ee..16311d14e53 100644
--- 
a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
+++ 
b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
@@ -77,7 +77,10 @@ class IcebergHook(BaseHook):
         # Start with extra so connection fields take precedence
         extra = conn.extra_dejson or {}
         catalog_properties: dict[str, str] = {**extra}
-        catalog_properties["uri"] = conn.host.rstrip("/") if conn.host else ""
+        host = conn.host.rstrip("/") if conn.host else None
+        if host:
+            catalog_properties["uri"] = host
+
         if "type" not in catalog_properties:
             catalog_properties["type"] = "rest"
 
diff --git 
a/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py 
b/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
index 1dbe4036476..322f60d007a 100644
--- a/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
+++ b/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
@@ -145,6 +145,24 @@ class TestIcebergHookCatalogConfig:
             call_kwargs = mock_load.call_args[1]
             assert call_kwargs["uri"] == "https://correct-host.example.com";
 
+    def test_extra_cannot_override_uri_when_empty(self, 
create_connection_without_db):
+        """Connection host should not override if it's empty"""
+        create_connection_without_db(
+            Connection(
+                conn_id="iceberg_default",
+                conn_type="iceberg",
+                host="",
+                extra='{"uri": "https://wrong-host.example.com"}',
+            )
+        )
+        hook = IcebergHook()
+        with patch(LOAD_CATALOG) as mock_load:
+            mock_load.return_value = MagicMock()
+            hook.get_conn()
+
+            call_kwargs = mock_load.call_args[1]
+            assert call_kwargs["uri"] == "https://wrong-host.example.com";
+
     def test_extra_can_override_catalog_type(self, 
create_connection_without_db):
         """Extra can set catalog type to non-REST (e.g., glue)."""
         create_connection_without_db(
diff --git a/providers/common/sql/docs/index.rst 
b/providers/common/sql/docs/index.rst
index 28cbd000dd5..d78f310a2f0 100644
--- a/providers/common/sql/docs/index.rst
+++ b/providers/common/sql/docs/index.rst
@@ -122,13 +122,14 @@ You can install such cross-provider dependencies when 
installing from PyPI. For
     pip install apache-airflow-providers-common-sql[amazon]
 
 
-==================================================================================================================
  =================
-Dependent package                                                              
                                     Extra
-==================================================================================================================
  =================
-`apache-airflow-providers-amazon 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_             
   ``amazon``
-`apache-airflow-providers-common-compat 
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_  
``common.compat``
-`apache-airflow-providers-openlineage 
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_      
``openlineage``
-==================================================================================================================
  =================
+====================================================================================================================
  ==================
+Dependent package                                                              
                                       Extra
+====================================================================================================================
  ==================
+`apache-airflow-providers-amazon 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_             
     ``amazon``
+`apache-airflow-providers-apache-iceberg 
<https://airflow.apache.org/docs/apache-airflow-providers-apache-iceberg>`_  
``apache.iceberg``
+`apache-airflow-providers-common-compat 
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_    
``common.compat``
+`apache-airflow-providers-openlineage 
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_        
``openlineage``
+====================================================================================================================
  ==================
 
 Downloading official packages
 -----------------------------
diff --git a/providers/common/sql/docs/operators.rst 
b/providers/common/sql/docs/operators.rst
index 0354c491559..773feefb4b5 100644
--- a/providers/common/sql/docs/operators.rst
+++ b/providers/common/sql/docs/operators.rst
@@ -299,8 +299,10 @@ DataSourceConfig Parameters
 * ``uri`` (str, required): URI of the datasource.
 * ``format`` (str, required): Format of the data.
 * ``table_name`` (str, required): Name of the table. Note: This name can be 
any identifier and should match the table name used in the SQL queries.
+* ``db_name`` (str, optional): Name of the database. Default is None. Ideally 
this is for catalog based storages like iceberg, provide the database name here 
and it will be used to fetch the table metadata.
 * ``storage_type`` (StorageType, optional): Type of storage. Default is None. 
If not provided, it will be inferred from the URI.
 * ``options`` (dict, optional): Additional options for the datasource. eg: if 
the datasource is partitioned, you can provide partitioning information in the 
options, e.g: ``{"table_partition_cols": [("year", "integer")]}``
+    look at here for more options: 
`<https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext>_.`
 
 S3 Storage
 ----------
@@ -328,3 +330,11 @@ analytics sql queries:
     :language: python
     :start-after: [START howto_analytics_decorator]
     :end-before: [END howto_analytics_decorator]
+
+Analytics with Iceberg
+-------------------------
+.. exampleinclude:: 
/../../sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_analytics_iceberg]
+    :end-before: [END howto_analytics_iceberg]
diff --git a/providers/common/sql/pyproject.toml 
b/providers/common/sql/pyproject.toml
index cd9484e49ea..7780a7fe945 100644
--- a/providers/common/sql/pyproject.toml
+++ b/providers/common/sql/pyproject.toml
@@ -88,8 +88,15 @@ dependencies = [
 "amazon" = [
     "apache-airflow-providers-amazon"
 ]
-"datafusion" =[
-    "datafusion>=50.0.0",
+# DataFusion 52.0.0 crate is not supported at the moment with iceberg-core
+"datafusion" = [
+    "datafusion>=50.0.0,<52.0.0",
+]
+"pyiceberg-core" = [
+    "pyiceberg-core>=0.8.0"
+]
+"apache.iceberg" = [
+    "apache-airflow-providers-apache-iceberg"
 ]
 
 [dependency-groups]
@@ -98,6 +105,7 @@ dev = [
     "apache-airflow-task-sdk",
     "apache-airflow-devel-common",
     "apache-airflow-providers-amazon",
+    "apache-airflow-providers-apache-iceberg",
     "apache-airflow-providers-common-compat",
     "apache-airflow-providers-openlineage",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
@@ -108,7 +116,8 @@ dev = [
     "apache-airflow-providers-odbc",
     "apache-airflow-providers-sqlite",
     "apache-airflow-providers-common-sql[sqlalchemy]",
-    "datafusion>=50.0.0"
+    "datafusion>=50.0.0,<52.0.0",
+    "pyiceberg-core>=0.8.0",
 ]
 
 # To build docs:
diff --git a/providers/common/sql/src/airflow/providers/common/sql/config.py 
b/providers/common/sql/src/airflow/providers/common/sql/config.py
index f16ddc5e50d..75cc15efedc 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/config.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/config.py
@@ -36,6 +36,11 @@ class FormatType(str, Enum):
     PARQUET = "parquet"
     CSV = "csv"
     AVRO = "avro"
+    ICEBERG = "iceberg"
+
+
+# TODO: Add delta format support
+TABLE_PROVIDERS: frozenset[str] = frozenset({FormatType.ICEBERG.value})
 
 
 class StorageType(str, Enum):
@@ -50,30 +55,48 @@ class DataSourceConfig:
     """
     Configuration for an input data source.
 
+    **File-based formats** (parquet, csv, avro) require ``uri`` and infer
+    ``storage_type`` automatically.
+
+    **Catalog-managed formats** (iceberg, and in the future delta, etc.) do not
+    require ``uri`` or ``storage_type``; they use ``conn_id`` and 
format-specific
+    keys in ``options`` (e.g. ``catalog_table_name`` for Iceberg).
+
     :param conn_id: The connection ID to use for accessing the data source.
     :param uri: The URI of the data source (e.g., file path, S3 bucket, etc.).
-    :param format: The format of the data (e.g., 'parquet', 'csv').
-    :param table_name: The name of the table if applicable.
-    :param schema: A dictionary mapping column names to their types.
-    :param db_name: The database name if applicable.
+        Not required for catalog-managed formats.
+    :param format: The format of the data (e.g., 'parquet', 'csv', 'iceberg').
+    :param table_name: The name to register the table under in DataFusion.
+    :param db_name: The namespace for table provider eg: iceberg needs to 
catalog it to look
     :param storage_type: The type of storage (automatically inferred from URI).
-    :param options: Additional options for the data source. eg: you can set 
partition columns to any datasource
-        that will be set in while registering the data
+        Not used for catalog-managed formats.
+    :param options: Additional options for the data source. e.g. you can set 
partition columns
+        for any file-based datasource, or ``catalog_table_name`` for Iceberg.
     """
 
     conn_id: str
-    uri: str
-    format: str | None = None
-    table_name: str | None = None
+    table_name: str
+    uri: str = ""
+    format: str = ""
+    db_name: str | None = None
     storage_type: StorageType | None = None
     options: dict[str, Any] = field(default_factory=dict)
 
+    @property
+    def is_table_provider(self) -> bool:
+        """Whether this format is catalog-managed (no object store needed)."""
+        return bool(self.format and self.format.lower() in TABLE_PROVIDERS)
+
     def __post_init__(self):
+        if self.is_table_provider:
+            if self.db_name is None:
+                raise ValueError(f"Database name must be provided for table 
providers {TABLE_PROVIDERS}")
+            return
 
         if self.storage_type is None:
             self.storage_type = self._extract_storage_type
 
-        if self.storage_type is not None and self.table_name is None:
+        if self.storage_type is not None and (not self.table_name or not 
self.table_name.strip()):
             raise ValueError("Table name must be provided for storage type")
 
     @property
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
index 496d4139835..f09881e552a 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
@@ -24,7 +24,12 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 if TYPE_CHECKING:
     from datafusion import SessionContext
 
-    from airflow.providers.common.sql.config import ConnectionConfig, 
FormatType, StorageType
+    from airflow.providers.common.sql.config import (
+        ConnectionConfig,
+        DataSourceConfig,
+        FormatType,
+        StorageType,
+    )
 
 
 class ObjectStorageProvider(LoggingMixin, ABC):
@@ -56,12 +61,16 @@ class ObjectStorageProvider(LoggingMixin, ABC):
 class FormatHandler(LoggingMixin, ABC):
     """Abstract base class for format handlers."""
 
+    def __init__(self, datasource_config: DataSourceConfig):
+        super().__init__()
+        self.datasource_config = datasource_config
+
     @property
     def get_format(self) -> FormatType:
         """Return file format type."""
         raise NotImplementedError
 
     @abstractmethod
-    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str) -> None:
+    def register_data_source_format(self, ctx: SessionContext) -> None:
         """Register data source format."""
         raise NotImplementedError
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
index e5c574600af..4786c1f779c 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
@@ -50,12 +50,14 @@ class DataFusionEngine(LoggingMixin):
         if not isinstance(datasource_config, DataSourceConfig):
             raise ValueError("datasource_config must be of type 
DataSourceConfig")
 
-        if datasource_config.storage_type == StorageType.LOCAL:
-            connection_config = None
-        else:
-            connection_config = 
self._get_connection_config(datasource_config.conn_id)
+        if not datasource_config.is_table_provider:
+            if datasource_config.storage_type == StorageType.LOCAL:
+                connection_config = None
+            else:
+                connection_config = 
self._get_connection_config(datasource_config.conn_id)
+
+            self._register_object_store(datasource_config, connection_config)
 
-        self._register_object_store(datasource_config, connection_config)
         self._register_data_source_format(datasource_config)
 
     def _register_object_store(
@@ -89,10 +91,9 @@ class DataFusionEngine(LoggingMixin):
                 f"Table {datasource_config.table_name} already registered for 
{self.registered_tables[datasource_config.table_name]}, please choose different 
name"
             )
 
-        format_cls = get_format_handler(datasource_config.format, 
datasource_config.options)
-        format_cls.register_data_source_format(
-            self.session_context, datasource_config.table_name, 
datasource_config.uri
-        )
+        format_cls = get_format_handler(datasource_config)
+
+        format_cls.register_data_source_format(self.session_context)
         self.registered_tables[datasource_config.table_name] = 
datasource_config.uri
         self.log.info(
             "Registered data source format %s for table: %s",
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
index dea1e5c219b..1c45734781a 100644
--- 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
@@ -29,3 +29,7 @@ class FileFormatRegistrationException(AirflowException):
 
 class QueryExecutionException(AirflowException):
     """Error while executing query."""
+
+
+class IcebergRegistrationException(AirflowException):
+    """Error while registering Iceberg table with DataFusion."""
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
index 4dd0f461456..dea530e27a6 100644
--- 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
@@ -16,98 +16,135 @@
 # under the License.
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
 
-from airflow.providers.common.sql.config import FormatType
+from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+from airflow.providers.common.sql.config import DataSourceConfig, FormatType
 from airflow.providers.common.sql.datafusion.base import FormatHandler
-from airflow.providers.common.sql.datafusion.exceptions import 
FileFormatRegistrationException
+from airflow.providers.common.sql.datafusion.exceptions import (
+    FileFormatRegistrationException,
+    IcebergRegistrationException,
+)
 
 if TYPE_CHECKING:
     from datafusion import SessionContext
 
 
 class ParquetFormatHandler(FormatHandler):
-    """
-    Parquet format handler.
-
-    :param options: Additional options for the Parquet format.
-        
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_parquet
-    """
-
-    def __init__(self, options: dict[str, Any] | None = None):
-        self.options = options or {}
+    """Parquet format handler."""
 
     @property
     def get_format(self) -> FormatType:
         """Return the format type."""
         return FormatType.PARQUET
 
-    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str):
+    def register_data_source_format(self, ctx: SessionContext):
         """Register a data source format."""
         try:
-            ctx.register_parquet(table_name, path, **self.options)
+            ctx.register_parquet(
+                self.datasource_config.table_name,
+                self.datasource_config.uri,
+                **self.datasource_config.options,
+            )
         except Exception as e:
-            raise FileFormatRegistrationException(f"Failed to register Parquet 
data source: {e}")
+            raise FileFormatRegistrationException(f"Failed to register 
{self.get_format} data source: {e}")
 
 
 class CsvFormatHandler(FormatHandler):
-    """
-    CSV format handler.
-
-    :param options: Additional options for the CSV format.
-        
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_csv
-    """
-
-    def __init__(self, options: dict[str, Any] | None = None):
-        self.options = options or {}
+    """CSV format handler."""
 
     @property
     def get_format(self) -> FormatType:
         """Return the format type."""
         return FormatType.CSV
 
-    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str):
+    def register_data_source_format(self, ctx: SessionContext):
         """Register a data source format."""
         try:
-            ctx.register_csv(table_name, path, **self.options)
+            ctx.register_csv(
+                self.datasource_config.table_name,
+                self.datasource_config.uri,
+                **self.datasource_config.options,
+            )
         except Exception as e:
-            raise FileFormatRegistrationException(f"Failed to register csv 
data source: {e}")
+            raise FileFormatRegistrationException(f"Failed to register 
{self.get_format} data source: {e}")
 
 
 class AvroFormatHandler(FormatHandler):
-    """
-    Avro format handler.
+    """Avro format handler."""
+
+    @property
+    def get_format(self) -> FormatType:
+        """Return the format type."""
+        return FormatType.AVRO
+
+    def register_data_source_format(self, ctx: SessionContext) -> None:
+        """Register a data source format."""
+        try:
+            ctx.register_avro(
+                self.datasource_config.table_name,
+                self.datasource_config.uri,
+                **self.datasource_config.options,
+            )
+        except Exception as e:
+            raise FileFormatRegistrationException(f"Failed to register 
{self.get_format} data source: {e}")
 
-    :param options: Additional options for the Avro format.
-        
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_avro
+
+class IcebergFormatHandler(FormatHandler):
     """
+    Iceberg format handler for DataFusion.
 
-    def __init__(self, options: dict[str, Any] | None = None):
-        self.options = options or {}
+    Loads an Iceberg table from a catalog using ``IcebergHook`` and registers
+    it with a DataFusion ``SessionContext`` via ``register_table_provider``.
+    """
 
     @property
     def get_format(self) -> FormatType:
         """Return the format type."""
-        return FormatType.AVRO
+        return FormatType.ICEBERG
+
+    def register_data_source_format(self, ctx: SessionContext) -> None:
+        """Register an Iceberg table with the DataFusion session context."""
+        try:
+            from airflow.providers.apache.iceberg.hooks.iceberg import 
IcebergHook
+        except ImportError:
+            raise AirflowOptionalProviderFeatureException(
+                "Iceberg format requires the 
apache-airflow-providers-apache-iceberg package. "
+                "Install it with: pip install 
'apache-airflow-providers-apache-iceberg'"
+            )
 
-    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str) -> None:
-        """Register a data source format."""
         try:
-            ctx.register_avro(table_name, path, **self.options)
+            hook = IcebergHook(iceberg_conn_id=self.datasource_config.conn_id)
+            namespace_table = 
f"{self.datasource_config.db_name}.{self.datasource_config.table_name}"
+            iceberg_table = hook.load_table(namespace_table)
+            io_properties = iceberg_table.io.properties
+
+            # TODO: Test for other catalog types
+            if "client.access-key-id" in io_properties:
+                # These properties require working datafusion otherwise it 
gets error when reading metadata from the s3
+                io_properties["s3.access-key-id"] = 
io_properties.get("client.access-key-id")
+                io_properties["s3.secret-access-key"] = 
io_properties.get("client.secret-access-key")
+            iceberg_table.io.properties = io_properties
+            ctx.register_table(self.datasource_config.table_name, 
iceberg_table)
         except Exception as e:
-            raise FileFormatRegistrationException(f"Failed to register Avro 
data source: {e}")
+            raise IcebergRegistrationException(
+                f"Failed to register Iceberg table 
'{self.datasource_config.table_name}' "
+                f"from connection '{self.datasource_config.conn_id}': {e}"
+            )
 
 
-def get_format_handler(format_type: str, options: dict[str, Any] | None = 
None) -> FormatHandler:
+def get_format_handler(datasource_config: DataSourceConfig) -> FormatHandler:
     """Get a format handler based on the format type."""
-    format_type = format_type.lower()
+    format_type = datasource_config.format.lower()
 
     match format_type:
         case "parquet":
-            return ParquetFormatHandler(options)
+            return ParquetFormatHandler(datasource_config)
         case "csv":
-            return CsvFormatHandler(options)
+            return CsvFormatHandler(datasource_config)
         case "avro":
-            return AvroFormatHandler(options)
+            return AvroFormatHandler(datasource_config)
+        case "iceberg":
+            return IcebergFormatHandler(datasource_config)
         case _:
             raise ValueError(f"Unsupported format: {format_type}")
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
 
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
index bee7829df7e..89a08bc1c26 100644
--- 
a/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
@@ -30,6 +30,30 @@ datasource_config_local = DataSourceConfig(
     conn_id="", table_name="users_data", uri="file:///path/to/", 
format="parquet"
 )
 
+datasource_config_iceberg = DataSourceConfig(
+    conn_id="iceberg_default",
+    table_name="users_data",
+    db_name="demo",  # will be used to load table via pyiceberg eg: 
demo.users_data
+    format="iceberg",
+)
+
+"""
+
+For example when working iceberg with glue catalog provide the following 
format for iceberg connection extras:
+
+{
+
+    "client.access-key-id": "<>",
+    "client.secret-access-key": "<>",
+    'client.region': '<region>',
+    "type": "glue",
+    "uri": "https://glue.<region>.amazonaws.com/iceberg",
+
+}
+
+"""
+
+
 # Please replace uri with appropriate value
 
 with DAG(
@@ -64,4 +88,12 @@ with DAG(
 
     # [END howto_analytics_decorator]
 
-    analytics_with_local >> get_user_summary_queries()
+    # [START howto_analytics_iceberg]
+
+    @task.analytics(datasource_configs=[datasource_config_iceberg])
+    def get_users_product_queries_from_iceberg_catalog():
+        return ["SELECT * FROM users_data LIMIT 10", "SELECT count(*) FROM 
users_data"]
+
+    # [END howto_analytics_iceberg]
+
+    analytics_with_local >> get_user_summary_queries() >> 
get_users_product_queries_from_iceberg_catalog()
diff --git 
a/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py 
b/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
index 2518864b17a..7c8c89eb759 100644
--- 
a/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
+++ 
b/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
@@ -16,15 +16,21 @@
 # under the License.
 from __future__ import annotations
 
-from unittest.mock import MagicMock
+from io import FileIO
+from unittest.mock import MagicMock, patch
 
 import pytest
+from pyiceberg.table import Table
 
-from airflow.providers.common.sql.config import FormatType
-from airflow.providers.common.sql.datafusion.exceptions import 
FileFormatRegistrationException
+from airflow.providers.common.sql.config import DataSourceConfig, FormatType
+from airflow.providers.common.sql.datafusion.exceptions import (
+    FileFormatRegistrationException,
+    IcebergRegistrationException,
+)
 from airflow.providers.common.sql.datafusion.format_handlers import (
     AvroFormatHandler,
     CsvFormatHandler,
+    IcebergFormatHandler,
     ParquetFormatHandler,
     get_format_handler,
 )
@@ -35,48 +41,137 @@ class TestFormatHandlers:
     def session_context_mock(self):
         return MagicMock()
 
-    def test_parquet_handler_success(self, session_context_mock):
-        handler = ParquetFormatHandler(options={"key": "value"})
-        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
-        session_context_mock.register_parquet.assert_called_once_with(
-            "table_name", "path/to/file", key="value"
+    @pytest.mark.parametrize(
+        ("format", "handler_class", "options"),
+        [
+            ("parquet", ParquetFormatHandler, {"key": "value"}),
+            ("csv", CsvFormatHandler, {"delimiter": ","}),
+            ("avro", AvroFormatHandler, {"key": "value"}),
+        ],
+    )
+    def test_file_handler_success(self, session_context_mock, format, 
handler_class, options):
+        datasource_config = DataSourceConfig(
+            table_name="table_name",
+            uri=f"file://path/to/file.{format}",
+            format=format,
+            conn_id="conn_id",
+            options=options,
         )
-        assert handler.get_format == FormatType.PARQUET
+        handler = handler_class(datasource_config)
+        handler.register_data_source_format(session_context_mock)
+        register_method = getattr(session_context_mock, f"register_{format}")
+        register_method.assert_called_once_with("table_name", 
f"file://path/to/file.{format}", **options)
+        assert handler.get_format == format
 
-    def test_parquet_handler_failure(self, session_context_mock):
-        session_context_mock.register_parquet.side_effect = Exception("Error")
-        handler = ParquetFormatHandler()
-        with pytest.raises(FileFormatRegistrationException, match="Failed to 
register Parquet data source"):
-            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+    @pytest.mark.parametrize(
+        ("format", "handler_class"),
+        [
+            (FormatType.PARQUET, ParquetFormatHandler),
+            (FormatType.CSV, CsvFormatHandler),
+            (FormatType.AVRO, AvroFormatHandler),
+        ],
+    )
+    def test_file_handler_failure(self, session_context_mock, format, 
handler_class):
+        datasource_config = DataSourceConfig(
+            table_name="table_name",
+            uri=f"file://path/to/file.{format}",
+            format=format,
+            conn_id="conn_id",
+        )
+        handler = handler_class(datasource_config)
+        register_method = getattr(session_context_mock, f"register_{format}")
+        register_method.side_effect = Exception("Error")
+        with pytest.raises(
+            FileFormatRegistrationException, match=f"Failed to register 
{format} data source:"
+        ):
+            handler.register_data_source_format(session_context_mock)
 
-    def test_csv_handler_success(self, session_context_mock):
-        handler = CsvFormatHandler(options={"delimiter": ","})
-        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
-        
session_context_mock.register_csv.assert_called_once_with("table_name", 
"path/to/file", delimiter=",")
-        assert handler.get_format == FormatType.CSV
+    @patch("airflow.providers.apache.iceberg.hooks.iceberg.IcebergHook")
+    def test_iceberg_handler_success(self, mock_iceberg_hook_cls, 
session_context_mock):
+        mock_hook = MagicMock()
+        mock_iceberg_hook_cls.return_value = mock_hook
+        mock_iceberg_table = MagicMock(spec=Table)
+        mock_io = MagicMock(spec=FileIO)
+        mock_io.properties = {
+            "client.access-key-id": "test_key",
+            "client.secret-access-key": "test_secret",
+        }
+        mock_iceberg_table.io = mock_io
+        mock_hook.load_table.return_value = mock_iceberg_table
+        datasource_config = DataSourceConfig(
+            table_name="my_table",
+            format="iceberg",
+            conn_id="iceberg_default",
+            db_name="default",
+        )
+        handler = IcebergFormatHandler(datasource_config)
+        handler.register_data_source_format(session_context_mock)
 
-    def test_csv_handler_failure(self, session_context_mock):
-        session_context_mock.register_csv.side_effect = Exception("Error")
-        handler = CsvFormatHandler()
-        with pytest.raises(FileFormatRegistrationException, match="Failed to 
register csv data source"):
-            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+        
mock_iceberg_hook_cls.assert_called_once_with(iceberg_conn_id="iceberg_default")
+        mock_hook.load_table.assert_called_once_with("default.my_table")
+        
session_context_mock.register_table.assert_called_once_with("my_table", 
mock_iceberg_table)
+        assert handler.get_format == FormatType.ICEBERG
 
-    def test_avro_handler_success(self, session_context_mock):
-        handler = AvroFormatHandler(options={"key": "value"})
-        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
-        
session_context_mock.register_avro.assert_called_once_with("table_name", 
"path/to/file", key="value")
-        assert handler.get_format == FormatType.AVRO
+        # Verify that s3 properties are correctly set
+        assert mock_iceberg_table.io.properties["s3.access-key-id"] == 
"test_key"
+        assert mock_iceberg_table.io.properties["s3.secret-access-key"] == 
"test_secret"
+
+    @patch("airflow.providers.apache.iceberg.hooks.iceberg.IcebergHook")
+    def test_iceberg_handler_failure(self, mock_iceberg_hook_cls, 
session_context_mock):
+        mock_hook = MagicMock()
+        mock_iceberg_hook_cls.return_value = mock_hook
+        mock_hook.load_table.side_effect = Exception("catalog error")
+        datasource_config = DataSourceConfig(
+            table_name="my_table", format="iceberg", 
conn_id="iceberg_default", db_name="default"
+        )
+        handler = IcebergFormatHandler(datasource_config)
+        with pytest.raises(IcebergRegistrationException, match="Failed to 
register Iceberg table"):
+            handler.register_data_source_format(session_context_mock)
 
-    def test_avro_handler_failure(self, session_context_mock):
-        session_context_mock.register_avro.side_effect = Exception("Error")
-        handler = AvroFormatHandler()
-        with pytest.raises(FileFormatRegistrationException, match="Failed to 
register Avro data source"):
-            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+    @pytest.mark.parametrize(
+        ("config_params", "error_match"),
+        [
+            (
+                {"table_name": "t", "format": "iceberg", "conn_id": "c"},
+                "Database name must be provided for table providers",
+            ),
+            (
+                {"table_name": "", "format": "parquet", "conn_id": "c", "uri": 
"file://u"},
+                "Table name must be provided for storage type",
+            ),
+            (
+                {"table_name": "t", "format": "parquet", "conn_id": "c"},
+                "Unsupported storage type for URI:",
+            ),
+        ],
+    )
+    def test_missing_mandatory_fields(self, config_params, error_match):
+        with pytest.raises(ValueError, match=error_match):
+            DataSourceConfig(**config_params)
 
     def test_get_format_handler(self):
-        assert isinstance(get_format_handler("parquet"), ParquetFormatHandler)
-        assert isinstance(get_format_handler("csv"), CsvFormatHandler)
-        assert isinstance(get_format_handler("avro"), AvroFormatHandler)
+        assert isinstance(
+            get_format_handler(
+                DataSourceConfig(table_name="t", format="parquet", 
conn_id="c", uri="file://u")
+            ),
+            ParquetFormatHandler,
+        )
+        assert isinstance(
+            get_format_handler(DataSourceConfig(table_name="t", format="csv", 
conn_id="c", uri="file://u")),
+            CsvFormatHandler,
+        )
+        assert isinstance(
+            get_format_handler(DataSourceConfig(table_name="t", format="avro", 
conn_id="c", uri="file://u")),
+            AvroFormatHandler,
+        )
+        assert isinstance(
+            get_format_handler(
+                DataSourceConfig(table_name="t", format="iceberg", 
conn_id="iceberg_default", db_name="d")
+            ),
+            IcebergFormatHandler,
+        )
 
-        with pytest.raises(ValueError, match="Unsupported format"):
-            get_format_handler("invalid")
+        with pytest.raises(ValueError, match="Unsupported format: invalid"):
+            get_format_handler(
+                DataSourceConfig(table_name="t", format="invalid", 
conn_id="c", uri="file://u")
+            )
diff --git a/providers/common/sql/tests/unit/common/sql/test_config.py 
b/providers/common/sql/tests/unit/common/sql/test_config.py
index abe020b6ad1..0e90c873443 100644
--- a/providers/common/sql/tests/unit/common/sql/test_config.py
+++ b/providers/common/sql/tests/unit/common/sql/test_config.py
@@ -46,7 +46,7 @@ class TestDataSourceConfig:
 
     def test_missing_table_name_raises_error(self):
         with pytest.raises(ValueError, match="Table name must be provided for 
storage type"):
-            DataSourceConfig(conn_id="test", uri="s3://bucket/path")
+            DataSourceConfig(conn_id="test", uri="s3://bucket/path", 
table_name="")
 
     def test_parquet_with_partition_cols(self):
         config = DataSourceConfig(
@@ -63,6 +63,27 @@ class TestDataSourceConfig:
         assert config.options == {"table_partition_cols": [("year", 
"integer"), ("month", "integer")]}
         assert config.storage_type == StorageType.S3
 
+    def test_iceberg_creation(self):
+        config = DataSourceConfig(
+            conn_id="iceberg_conn", table_name="default.my_iceberg_table", 
format="iceberg", db_name="default"
+        )
+        assert config.conn_id == "iceberg_conn"
+        assert config.table_name == "default.my_iceberg_table"
+        assert config.format == "iceberg"
+        assert config.storage_type is None
+
+    def test_iceberg_is_table_provider(self):
+        config = DataSourceConfig(
+            conn_id="iceberg_conn", table_name="my_table", format="iceberg", 
db_name="default"
+        )
+        assert config.is_table_provider is True
+
+    def test_non_iceberg_is_not_table_provider(self):
+        config = DataSourceConfig(
+            conn_id="test", uri="s3://bucket/path", table_name="my_table", 
format="parquet"
+        )
+        assert config.is_table_provider is False
+
 
 class TestConnectionConfig:
     def test_connection_config_creation(self):


Reply via email to