This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 483d84c6c95 Add iceberg support to AnalyticsOperator (#62754)
483d84c6c95 is described below
commit 483d84c6c95536b8679520fdfdaf30e21320850d
Author: GPK <[email protected]>
AuthorDate: Tue Mar 3 09:56:28 2026 +0000
Add iceberg support to AnalyticsOperator (#62754)
* Add iceberg support to analytics operator
* Add iceberg support to analytics operator
* Resolve comments
* Fixup tests
---
dev/breeze/tests/test_selective_checks.py | 2 +-
docs/spelling_wordlist.txt | 1 +
.../providers/apache/iceberg/hooks/iceberg.py | 5 +-
.../unit/apache/iceberg/hooks/test_iceberg.py | 18 +++
providers/common/sql/docs/index.rst | 15 +-
providers/common/sql/docs/operators.rst | 10 ++
providers/common/sql/pyproject.toml | 15 +-
.../sql/src/airflow/providers/common/sql/config.py | 43 +++--
.../providers/common/sql/datafusion/base.py | 13 +-
.../providers/common/sql/datafusion/engine.py | 19 +--
.../providers/common/sql/datafusion/exceptions.py | 4 +
.../common/sql/datafusion/format_handlers.py | 123 ++++++++++-----
.../common/sql/example_dags/example_analytics.py | 34 +++-
.../common/sql/datafusion/test_format_handlers.py | 173 ++++++++++++++++-----
.../sql/tests/unit/common/sql/test_config.py | 23 ++-
15 files changed, 381 insertions(+), 117 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 18e972c6ba1..5ec37351cad 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2248,7 +2248,7 @@ def test_upgrade_to_newer_dependencies(
pytest.param(
("providers/common/sql/src/airflow/providers/common/sql/common_sql_python.py",),
{
- "docs-list-as-string": "amazon apache.drill apache.druid
apache.hive "
+ "docs-list-as-string": "amazon apache.drill apache.druid
apache.hive apache.iceberg "
"apache.impala apache.pinot common.ai common.compat common.sql
databricks elasticsearch "
"exasol google jdbc microsoft.mssql mysql odbc openlineage "
"oracle pgvector postgres presto slack snowflake sqlite
teradata trino vertica ydb",
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 50d5df9dea2..fbfbeda2b00 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1765,6 +1765,7 @@ stderr
stdout
stmts
StorageClass
+storages
StoredInfoType
storedInfoType
str
diff --git
a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
index bad7f3e44ee..16311d14e53 100644
---
a/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
+++
b/providers/apache/iceberg/src/airflow/providers/apache/iceberg/hooks/iceberg.py
@@ -77,7 +77,10 @@ class IcebergHook(BaseHook):
# Start with extra so connection fields take precedence
extra = conn.extra_dejson or {}
catalog_properties: dict[str, str] = {**extra}
- catalog_properties["uri"] = conn.host.rstrip("/") if conn.host else ""
+ host = conn.host.rstrip("/") if conn.host else None
+ if host:
+ catalog_properties["uri"] = host
+
if "type" not in catalog_properties:
catalog_properties["type"] = "rest"
diff --git
a/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
b/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
index 1dbe4036476..322f60d007a 100644
--- a/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
+++ b/providers/apache/iceberg/tests/unit/apache/iceberg/hooks/test_iceberg.py
@@ -145,6 +145,24 @@ class TestIcebergHookCatalogConfig:
call_kwargs = mock_load.call_args[1]
assert call_kwargs["uri"] == "https://correct-host.example.com"
+ def test_extra_cannot_override_uri_when_empty(self,
create_connection_without_db):
+ """Connection host should not override if it's empty"""
+ create_connection_without_db(
+ Connection(
+ conn_id="iceberg_default",
+ conn_type="iceberg",
+ host="",
+ extra='{"uri": "https://wrong-host.example.com"}',
+ )
+ )
+ hook = IcebergHook()
+ with patch(LOAD_CATALOG) as mock_load:
+ mock_load.return_value = MagicMock()
+ hook.get_conn()
+
+ call_kwargs = mock_load.call_args[1]
+ assert call_kwargs["uri"] == "https://wrong-host.example.com"
+
def test_extra_can_override_catalog_type(self,
create_connection_without_db):
"""Extra can set catalog type to non-REST (e.g., glue)."""
create_connection_without_db(
diff --git a/providers/common/sql/docs/index.rst
b/providers/common/sql/docs/index.rst
index 28cbd000dd5..d78f310a2f0 100644
--- a/providers/common/sql/docs/index.rst
+++ b/providers/common/sql/docs/index.rst
@@ -122,13 +122,14 @@ You can install such cross-provider dependencies when
installing from PyPI. For
pip install apache-airflow-providers-common-sql[amazon]
-==================================================================================================================
=================
-Dependent package
Extra
-==================================================================================================================
=================
-`apache-airflow-providers-amazon
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_
``amazon``
-`apache-airflow-providers-common-compat
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_
``common.compat``
-`apache-airflow-providers-openlineage
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_
``openlineage``
-==================================================================================================================
=================
+====================================================================================================================
==================
+Dependent package
Extra
+====================================================================================================================
==================
+`apache-airflow-providers-amazon
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_
``amazon``
+`apache-airflow-providers-apache-iceberg
<https://airflow.apache.org/docs/apache-airflow-providers-apache-iceberg>`_
``apache.iceberg``
+`apache-airflow-providers-common-compat
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_
``common.compat``
+`apache-airflow-providers-openlineage
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_
``openlineage``
+====================================================================================================================
==================
Downloading official packages
-----------------------------
diff --git a/providers/common/sql/docs/operators.rst
b/providers/common/sql/docs/operators.rst
index 0354c491559..773feefb4b5 100644
--- a/providers/common/sql/docs/operators.rst
+++ b/providers/common/sql/docs/operators.rst
@@ -299,8 +299,10 @@ DataSourceConfig Parameters
* ``uri`` (str, required): URI of the datasource.
* ``format`` (str, required): Format of the data.
* ``table_name`` (str, required): Name of the table. Note: This name can be
any identifier and should match the table name used in the SQL queries.
+* ``db_name`` (str, optional): Name of the database. Default is None. Ideally
this is for catalog based storages like iceberg, provide the database name here
and it will be used to fetch the table metadata.
* ``storage_type`` (StorageType, optional): Type of storage. Default is None.
If not provided, it will be inferred from the URI.
* ``options`` (dict, optional): Additional options for the datasource. eg: if
the datasource is partitioned, you can provide partitioning information in the
options, e.g: ``{"table_partition_cols": [("year", "integer")]}``
+ look at here for more options:
`<https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext>_.`
S3 Storage
----------
@@ -328,3 +330,11 @@ analytics sql queries:
:language: python
:start-after: [START howto_analytics_decorator]
:end-before: [END howto_analytics_decorator]
+
+Analytics with Iceberg
+-------------------------
+.. exampleinclude::
/../../sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_analytics_iceberg]
+ :end-before: [END howto_analytics_iceberg]
diff --git a/providers/common/sql/pyproject.toml
b/providers/common/sql/pyproject.toml
index cd9484e49ea..7780a7fe945 100644
--- a/providers/common/sql/pyproject.toml
+++ b/providers/common/sql/pyproject.toml
@@ -88,8 +88,15 @@ dependencies = [
"amazon" = [
"apache-airflow-providers-amazon"
]
-"datafusion" =[
- "datafusion>=50.0.0",
+# DataFusion 52.0.0 crate is not supported at the moment with iceberg-core
+"datafusion" = [
+ "datafusion>=50.0.0,<52.0.0",
+]
+"pyiceberg-core" = [
+ "pyiceberg-core>=0.8.0"
+]
+"apache.iceberg" = [
+ "apache-airflow-providers-apache-iceberg"
]
[dependency-groups]
@@ -98,6 +105,7 @@ dev = [
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-amazon",
+ "apache-airflow-providers-apache-iceberg",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-openlineage",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
@@ -108,7 +116,8 @@ dev = [
"apache-airflow-providers-odbc",
"apache-airflow-providers-sqlite",
"apache-airflow-providers-common-sql[sqlalchemy]",
- "datafusion>=50.0.0"
+ "datafusion>=50.0.0,<52.0.0",
+ "pyiceberg-core>=0.8.0",
]
# To build docs:
diff --git a/providers/common/sql/src/airflow/providers/common/sql/config.py
b/providers/common/sql/src/airflow/providers/common/sql/config.py
index f16ddc5e50d..75cc15efedc 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/config.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/config.py
@@ -36,6 +36,11 @@ class FormatType(str, Enum):
PARQUET = "parquet"
CSV = "csv"
AVRO = "avro"
+ ICEBERG = "iceberg"
+
+
+# TODO: Add delta format support
+TABLE_PROVIDERS: frozenset[str] = frozenset({FormatType.ICEBERG.value})
class StorageType(str, Enum):
@@ -50,30 +55,48 @@ class DataSourceConfig:
"""
Configuration for an input data source.
+ **File-based formats** (parquet, csv, avro) require ``uri`` and infer
+ ``storage_type`` automatically.
+
+ **Catalog-managed formats** (iceberg, and in the future delta, etc.) do not
+ require ``uri`` or ``storage_type``; they use ``conn_id`` and
format-specific
+ keys in ``options`` (e.g. ``catalog_table_name`` for Iceberg).
+
:param conn_id: The connection ID to use for accessing the data source.
:param uri: The URI of the data source (e.g., file path, S3 bucket, etc.).
- :param format: The format of the data (e.g., 'parquet', 'csv').
- :param table_name: The name of the table if applicable.
- :param schema: A dictionary mapping column names to their types.
- :param db_name: The database name if applicable.
+ Not required for catalog-managed formats.
+ :param format: The format of the data (e.g., 'parquet', 'csv', 'iceberg').
+ :param table_name: The name to register the table under in DataFusion.
+ :param db_name: The namespace for table provider eg: iceberg needs to
catalog it to look
:param storage_type: The type of storage (automatically inferred from URI).
- :param options: Additional options for the data source. eg: you can set
partition columns to any datasource
- that will be set in while registering the data
+ Not used for catalog-managed formats.
+ :param options: Additional options for the data source. e.g. you can set
partition columns
+ for any file-based datasource, or ``catalog_table_name`` for Iceberg.
"""
conn_id: str
- uri: str
- format: str | None = None
- table_name: str | None = None
+ table_name: str
+ uri: str = ""
+ format: str = ""
+ db_name: str | None = None
storage_type: StorageType | None = None
options: dict[str, Any] = field(default_factory=dict)
+ @property
+ def is_table_provider(self) -> bool:
+ """Whether this format is catalog-managed (no object store needed)."""
+ return bool(self.format and self.format.lower() in TABLE_PROVIDERS)
+
def __post_init__(self):
+ if self.is_table_provider:
+ if self.db_name is None:
+ raise ValueError(f"Database name must be provided for table
providers {TABLE_PROVIDERS}")
+ return
if self.storage_type is None:
self.storage_type = self._extract_storage_type
- if self.storage_type is not None and self.table_name is None:
+ if self.storage_type is not None and (not self.table_name or not
self.table_name.strip()):
raise ValueError("Table name must be provided for storage type")
@property
diff --git
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
index 496d4139835..f09881e552a 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
@@ -24,7 +24,12 @@ from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from datafusion import SessionContext
- from airflow.providers.common.sql.config import ConnectionConfig,
FormatType, StorageType
+ from airflow.providers.common.sql.config import (
+ ConnectionConfig,
+ DataSourceConfig,
+ FormatType,
+ StorageType,
+ )
class ObjectStorageProvider(LoggingMixin, ABC):
@@ -56,12 +61,16 @@ class ObjectStorageProvider(LoggingMixin, ABC):
class FormatHandler(LoggingMixin, ABC):
"""Abstract base class for format handlers."""
+ def __init__(self, datasource_config: DataSourceConfig):
+ super().__init__()
+ self.datasource_config = datasource_config
+
@property
def get_format(self) -> FormatType:
"""Return file format type."""
raise NotImplementedError
@abstractmethod
- def register_data_source_format(self, ctx: SessionContext, table_name:
str, path: str) -> None:
+ def register_data_source_format(self, ctx: SessionContext) -> None:
"""Register data source format."""
raise NotImplementedError
diff --git
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
index e5c574600af..4786c1f779c 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
@@ -50,12 +50,14 @@ class DataFusionEngine(LoggingMixin):
if not isinstance(datasource_config, DataSourceConfig):
raise ValueError("datasource_config must be of type
DataSourceConfig")
- if datasource_config.storage_type == StorageType.LOCAL:
- connection_config = None
- else:
- connection_config =
self._get_connection_config(datasource_config.conn_id)
+ if not datasource_config.is_table_provider:
+ if datasource_config.storage_type == StorageType.LOCAL:
+ connection_config = None
+ else:
+ connection_config =
self._get_connection_config(datasource_config.conn_id)
+
+ self._register_object_store(datasource_config, connection_config)
- self._register_object_store(datasource_config, connection_config)
self._register_data_source_format(datasource_config)
def _register_object_store(
@@ -89,10 +91,9 @@ class DataFusionEngine(LoggingMixin):
f"Table {datasource_config.table_name} already registered for
{self.registered_tables[datasource_config.table_name]}, please choose different
name"
)
- format_cls = get_format_handler(datasource_config.format,
datasource_config.options)
- format_cls.register_data_source_format(
- self.session_context, datasource_config.table_name,
datasource_config.uri
- )
+ format_cls = get_format_handler(datasource_config)
+
+ format_cls.register_data_source_format(self.session_context)
self.registered_tables[datasource_config.table_name] =
datasource_config.uri
self.log.info(
"Registered data source format %s for table: %s",
diff --git
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
index dea1e5c219b..1c45734781a 100644
---
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
+++
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
@@ -29,3 +29,7 @@ class FileFormatRegistrationException(AirflowException):
class QueryExecutionException(AirflowException):
"""Error while executing query."""
+
+
+class IcebergRegistrationException(AirflowException):
+ """Error while registering Iceberg table with DataFusion."""
diff --git
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
index 4dd0f461456..dea530e27a6 100644
---
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
+++
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
@@ -16,98 +16,135 @@
# under the License.
from __future__ import annotations
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
-from airflow.providers.common.sql.config import FormatType
+from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
+from airflow.providers.common.sql.config import DataSourceConfig, FormatType
from airflow.providers.common.sql.datafusion.base import FormatHandler
-from airflow.providers.common.sql.datafusion.exceptions import
FileFormatRegistrationException
+from airflow.providers.common.sql.datafusion.exceptions import (
+ FileFormatRegistrationException,
+ IcebergRegistrationException,
+)
if TYPE_CHECKING:
from datafusion import SessionContext
class ParquetFormatHandler(FormatHandler):
- """
- Parquet format handler.
-
- :param options: Additional options for the Parquet format.
-
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_parquet
- """
-
- def __init__(self, options: dict[str, Any] | None = None):
- self.options = options or {}
+ """Parquet format handler."""
@property
def get_format(self) -> FormatType:
"""Return the format type."""
return FormatType.PARQUET
- def register_data_source_format(self, ctx: SessionContext, table_name:
str, path: str):
+ def register_data_source_format(self, ctx: SessionContext):
"""Register a data source format."""
try:
- ctx.register_parquet(table_name, path, **self.options)
+ ctx.register_parquet(
+ self.datasource_config.table_name,
+ self.datasource_config.uri,
+ **self.datasource_config.options,
+ )
except Exception as e:
- raise FileFormatRegistrationException(f"Failed to register Parquet
data source: {e}")
+ raise FileFormatRegistrationException(f"Failed to register
{self.get_format} data source: {e}")
class CsvFormatHandler(FormatHandler):
- """
- CSV format handler.
-
- :param options: Additional options for the CSV format.
-
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_csv
- """
-
- def __init__(self, options: dict[str, Any] | None = None):
- self.options = options or {}
+ """CSV format handler."""
@property
def get_format(self) -> FormatType:
"""Return the format type."""
return FormatType.CSV
- def register_data_source_format(self, ctx: SessionContext, table_name:
str, path: str):
+ def register_data_source_format(self, ctx: SessionContext):
"""Register a data source format."""
try:
- ctx.register_csv(table_name, path, **self.options)
+ ctx.register_csv(
+ self.datasource_config.table_name,
+ self.datasource_config.uri,
+ **self.datasource_config.options,
+ )
except Exception as e:
- raise FileFormatRegistrationException(f"Failed to register csv
data source: {e}")
+ raise FileFormatRegistrationException(f"Failed to register
{self.get_format} data source: {e}")
class AvroFormatHandler(FormatHandler):
- """
- Avro format handler.
+ """Avro format handler."""
+
+ @property
+ def get_format(self) -> FormatType:
+ """Return the format type."""
+ return FormatType.AVRO
+
+ def register_data_source_format(self, ctx: SessionContext) -> None:
+ """Register a data source format."""
+ try:
+ ctx.register_avro(
+ self.datasource_config.table_name,
+ self.datasource_config.uri,
+ **self.datasource_config.options,
+ )
+ except Exception as e:
+ raise FileFormatRegistrationException(f"Failed to register
{self.get_format} data source: {e}")
- :param options: Additional options for the Avro format.
-
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_avro
+
+class IcebergFormatHandler(FormatHandler):
"""
+ Iceberg format handler for DataFusion.
- def __init__(self, options: dict[str, Any] | None = None):
- self.options = options or {}
+ Loads an Iceberg table from a catalog using ``IcebergHook`` and registers
+ it with a DataFusion ``SessionContext`` via ``register_table_provider``.
+ """
@property
def get_format(self) -> FormatType:
"""Return the format type."""
- return FormatType.AVRO
+ return FormatType.ICEBERG
+
+ def register_data_source_format(self, ctx: SessionContext) -> None:
+ """Register an Iceberg table with the DataFusion session context."""
+ try:
+ from airflow.providers.apache.iceberg.hooks.iceberg import
IcebergHook
+ except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "Iceberg format requires the
apache-airflow-providers-apache-iceberg package. "
+ "Install it with: pip install
'apache-airflow-providers-apache-iceberg'"
+ )
- def register_data_source_format(self, ctx: SessionContext, table_name:
str, path: str) -> None:
- """Register a data source format."""
try:
- ctx.register_avro(table_name, path, **self.options)
+ hook = IcebergHook(iceberg_conn_id=self.datasource_config.conn_id)
+ namespace_table =
f"{self.datasource_config.db_name}.{self.datasource_config.table_name}"
+ iceberg_table = hook.load_table(namespace_table)
+ io_properties = iceberg_table.io.properties
+
+ # TODO: Test for other catalog types
+ if "client.access-key-id" in io_properties:
+ # These properties require working datafusion otherwise it
gets error when reading metadata from the s3
+ io_properties["s3.access-key-id"] =
io_properties.get("client.access-key-id")
+ io_properties["s3.secret-access-key"] =
io_properties.get("client.secret-access-key")
+ iceberg_table.io.properties = io_properties
+ ctx.register_table(self.datasource_config.table_name,
iceberg_table)
except Exception as e:
- raise FileFormatRegistrationException(f"Failed to register Avro
data source: {e}")
+ raise IcebergRegistrationException(
+ f"Failed to register Iceberg table
'{self.datasource_config.table_name}' "
+ f"from connection '{self.datasource_config.conn_id}': {e}"
+ )
-def get_format_handler(format_type: str, options: dict[str, Any] | None =
None) -> FormatHandler:
+def get_format_handler(datasource_config: DataSourceConfig) -> FormatHandler:
"""Get a format handler based on the format type."""
- format_type = format_type.lower()
+ format_type = datasource_config.format.lower()
match format_type:
case "parquet":
- return ParquetFormatHandler(options)
+ return ParquetFormatHandler(datasource_config)
case "csv":
- return CsvFormatHandler(options)
+ return CsvFormatHandler(datasource_config)
case "avro":
- return AvroFormatHandler(options)
+ return AvroFormatHandler(datasource_config)
+ case "iceberg":
+ return IcebergFormatHandler(datasource_config)
case _:
raise ValueError(f"Unsupported format: {format_type}")
diff --git
a/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
index bee7829df7e..89a08bc1c26 100644
---
a/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
+++
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
@@ -30,6 +30,30 @@ datasource_config_local = DataSourceConfig(
conn_id="", table_name="users_data", uri="file:///path/to/",
format="parquet"
)
+datasource_config_iceberg = DataSourceConfig(
+ conn_id="iceberg_default",
+ table_name="users_data",
+ db_name="demo", # will be used to load table via pyiceberg eg:
demo.users_data
+ format="iceberg",
+)
+
+"""
+
+For example when working iceberg with glue catalog provide the following
format for iceberg connection extras:
+
+{
+
+ "client.access-key-id": "<>",
+ "client.secret-access-key": "<>",
+ 'client.region': '<region>',
+ "type": "glue",
+ "uri": "https://glue.<region>.amazonaws.com/iceberg",
+
+}
+
+"""
+
+
# Please replace uri with appropriate value
with DAG(
@@ -64,4 +88,12 @@ with DAG(
# [END howto_analytics_decorator]
- analytics_with_local >> get_user_summary_queries()
+ # [START howto_analytics_iceberg]
+
+ @task.analytics(datasource_configs=[datasource_config_iceberg])
+ def get_users_product_queries_from_iceberg_catalog():
+ return ["SELECT * FROM users_data LIMIT 10", "SELECT count(*) FROM
users_data"]
+
+ # [END howto_analytics_iceberg]
+
+ analytics_with_local >> get_user_summary_queries() >>
get_users_product_queries_from_iceberg_catalog()
diff --git
a/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
b/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
index 2518864b17a..7c8c89eb759 100644
---
a/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
+++
b/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
@@ -16,15 +16,21 @@
# under the License.
from __future__ import annotations
-from unittest.mock import MagicMock
+from io import FileIO
+from unittest.mock import MagicMock, patch
import pytest
+from pyiceberg.table import Table
-from airflow.providers.common.sql.config import FormatType
-from airflow.providers.common.sql.datafusion.exceptions import
FileFormatRegistrationException
+from airflow.providers.common.sql.config import DataSourceConfig, FormatType
+from airflow.providers.common.sql.datafusion.exceptions import (
+ FileFormatRegistrationException,
+ IcebergRegistrationException,
+)
from airflow.providers.common.sql.datafusion.format_handlers import (
AvroFormatHandler,
CsvFormatHandler,
+ IcebergFormatHandler,
ParquetFormatHandler,
get_format_handler,
)
@@ -35,48 +41,137 @@ class TestFormatHandlers:
def session_context_mock(self):
return MagicMock()
- def test_parquet_handler_success(self, session_context_mock):
- handler = ParquetFormatHandler(options={"key": "value"})
- handler.register_data_source_format(session_context_mock,
"table_name", "path/to/file")
- session_context_mock.register_parquet.assert_called_once_with(
- "table_name", "path/to/file", key="value"
+ @pytest.mark.parametrize(
+ ("format", "handler_class", "options"),
+ [
+ ("parquet", ParquetFormatHandler, {"key": "value"}),
+ ("csv", CsvFormatHandler, {"delimiter": ","}),
+ ("avro", AvroFormatHandler, {"key": "value"}),
+ ],
+ )
+ def test_file_handler_success(self, session_context_mock, format,
handler_class, options):
+ datasource_config = DataSourceConfig(
+ table_name="table_name",
+ uri=f"file://path/to/file.{format}",
+ format=format,
+ conn_id="conn_id",
+ options=options,
)
- assert handler.get_format == FormatType.PARQUET
+ handler = handler_class(datasource_config)
+ handler.register_data_source_format(session_context_mock)
+ register_method = getattr(session_context_mock, f"register_{format}")
+ register_method.assert_called_once_with("table_name",
f"file://path/to/file.{format}", **options)
+ assert handler.get_format == format
- def test_parquet_handler_failure(self, session_context_mock):
- session_context_mock.register_parquet.side_effect = Exception("Error")
- handler = ParquetFormatHandler()
- with pytest.raises(FileFormatRegistrationException, match="Failed to
register Parquet data source"):
- handler.register_data_source_format(session_context_mock,
"table_name", "path/to/file")
+ @pytest.mark.parametrize(
+ ("format", "handler_class"),
+ [
+ (FormatType.PARQUET, ParquetFormatHandler),
+ (FormatType.CSV, CsvFormatHandler),
+ (FormatType.AVRO, AvroFormatHandler),
+ ],
+ )
+ def test_file_handler_failure(self, session_context_mock, format,
handler_class):
+ datasource_config = DataSourceConfig(
+ table_name="table_name",
+ uri=f"file://path/to/file.{format}",
+ format=format,
+ conn_id="conn_id",
+ )
+ handler = handler_class(datasource_config)
+ register_method = getattr(session_context_mock, f"register_{format}")
+ register_method.side_effect = Exception("Error")
+ with pytest.raises(
+ FileFormatRegistrationException, match=f"Failed to register
{format} data source:"
+ ):
+ handler.register_data_source_format(session_context_mock)
- def test_csv_handler_success(self, session_context_mock):
- handler = CsvFormatHandler(options={"delimiter": ","})
- handler.register_data_source_format(session_context_mock,
"table_name", "path/to/file")
-
session_context_mock.register_csv.assert_called_once_with("table_name",
"path/to/file", delimiter=",")
- assert handler.get_format == FormatType.CSV
+ @patch("airflow.providers.apache.iceberg.hooks.iceberg.IcebergHook")
+ def test_iceberg_handler_success(self, mock_iceberg_hook_cls,
session_context_mock):
+ mock_hook = MagicMock()
+ mock_iceberg_hook_cls.return_value = mock_hook
+ mock_iceberg_table = MagicMock(spec=Table)
+ mock_io = MagicMock(spec=FileIO)
+ mock_io.properties = {
+ "client.access-key-id": "test_key",
+ "client.secret-access-key": "test_secret",
+ }
+ mock_iceberg_table.io = mock_io
+ mock_hook.load_table.return_value = mock_iceberg_table
+ datasource_config = DataSourceConfig(
+ table_name="my_table",
+ format="iceberg",
+ conn_id="iceberg_default",
+ db_name="default",
+ )
+ handler = IcebergFormatHandler(datasource_config)
+ handler.register_data_source_format(session_context_mock)
- def test_csv_handler_failure(self, session_context_mock):
- session_context_mock.register_csv.side_effect = Exception("Error")
- handler = CsvFormatHandler()
- with pytest.raises(FileFormatRegistrationException, match="Failed to
register csv data source"):
- handler.register_data_source_format(session_context_mock,
"table_name", "path/to/file")
+
mock_iceberg_hook_cls.assert_called_once_with(iceberg_conn_id="iceberg_default")
+ mock_hook.load_table.assert_called_once_with("default.my_table")
+
session_context_mock.register_table.assert_called_once_with("my_table",
mock_iceberg_table)
+ assert handler.get_format == FormatType.ICEBERG
- def test_avro_handler_success(self, session_context_mock):
- handler = AvroFormatHandler(options={"key": "value"})
- handler.register_data_source_format(session_context_mock,
"table_name", "path/to/file")
-
session_context_mock.register_avro.assert_called_once_with("table_name",
"path/to/file", key="value")
- assert handler.get_format == FormatType.AVRO
+ # Verify that s3 properties are correctly set
+ assert mock_iceberg_table.io.properties["s3.access-key-id"] ==
"test_key"
+ assert mock_iceberg_table.io.properties["s3.secret-access-key"] ==
"test_secret"
+
+ @patch("airflow.providers.apache.iceberg.hooks.iceberg.IcebergHook")
+ def test_iceberg_handler_failure(self, mock_iceberg_hook_cls,
session_context_mock):
+ mock_hook = MagicMock()
+ mock_iceberg_hook_cls.return_value = mock_hook
+ mock_hook.load_table.side_effect = Exception("catalog error")
+ datasource_config = DataSourceConfig(
+ table_name="my_table", format="iceberg",
conn_id="iceberg_default", db_name="default"
+ )
+ handler = IcebergFormatHandler(datasource_config)
+ with pytest.raises(IcebergRegistrationException, match="Failed to
register Iceberg table"):
+ handler.register_data_source_format(session_context_mock)
- def test_avro_handler_failure(self, session_context_mock):
- session_context_mock.register_avro.side_effect = Exception("Error")
- handler = AvroFormatHandler()
- with pytest.raises(FileFormatRegistrationException, match="Failed to
register Avro data source"):
- handler.register_data_source_format(session_context_mock,
"table_name", "path/to/file")
+ @pytest.mark.parametrize(
+ ("config_params", "error_match"),
+ [
+ (
+ {"table_name": "t", "format": "iceberg", "conn_id": "c"},
+ "Database name must be provided for table providers",
+ ),
+ (
+ {"table_name": "", "format": "parquet", "conn_id": "c", "uri":
"file://u"},
+ "Table name must be provided for storage type",
+ ),
+ (
+ {"table_name": "t", "format": "parquet", "conn_id": "c"},
+ "Unsupported storage type for URI:",
+ ),
+ ],
+ )
+ def test_missing_mandatory_fields(self, config_params, error_match):
+ with pytest.raises(ValueError, match=error_match):
+ DataSourceConfig(**config_params)
def test_get_format_handler(self):
- assert isinstance(get_format_handler("parquet"), ParquetFormatHandler)
- assert isinstance(get_format_handler("csv"), CsvFormatHandler)
- assert isinstance(get_format_handler("avro"), AvroFormatHandler)
+ assert isinstance(
+ get_format_handler(
+ DataSourceConfig(table_name="t", format="parquet",
conn_id="c", uri="file://u")
+ ),
+ ParquetFormatHandler,
+ )
+ assert isinstance(
+ get_format_handler(DataSourceConfig(table_name="t", format="csv",
conn_id="c", uri="file://u")),
+ CsvFormatHandler,
+ )
+ assert isinstance(
+ get_format_handler(DataSourceConfig(table_name="t", format="avro",
conn_id="c", uri="file://u")),
+ AvroFormatHandler,
+ )
+ assert isinstance(
+ get_format_handler(
+ DataSourceConfig(table_name="t", format="iceberg",
conn_id="iceberg_default", db_name="d")
+ ),
+ IcebergFormatHandler,
+ )
- with pytest.raises(ValueError, match="Unsupported format"):
- get_format_handler("invalid")
+ with pytest.raises(ValueError, match="Unsupported format: invalid"):
+ get_format_handler(
+ DataSourceConfig(table_name="t", format="invalid",
conn_id="c", uri="file://u")
+ )
diff --git a/providers/common/sql/tests/unit/common/sql/test_config.py
b/providers/common/sql/tests/unit/common/sql/test_config.py
index abe020b6ad1..0e90c873443 100644
--- a/providers/common/sql/tests/unit/common/sql/test_config.py
+++ b/providers/common/sql/tests/unit/common/sql/test_config.py
@@ -46,7 +46,7 @@ class TestDataSourceConfig:
def test_missing_table_name_raises_error(self):
with pytest.raises(ValueError, match="Table name must be provided for
storage type"):
- DataSourceConfig(conn_id="test", uri="s3://bucket/path")
+ DataSourceConfig(conn_id="test", uri="s3://bucket/path",
table_name="")
def test_parquet_with_partition_cols(self):
config = DataSourceConfig(
@@ -63,6 +63,27 @@ class TestDataSourceConfig:
assert config.options == {"table_partition_cols": [("year",
"integer"), ("month", "integer")]}
assert config.storage_type == StorageType.S3
+ def test_iceberg_creation(self):
+ config = DataSourceConfig(
+ conn_id="iceberg_conn", table_name="default.my_iceberg_table",
format="iceberg", db_name="default"
+ )
+ assert config.conn_id == "iceberg_conn"
+ assert config.table_name == "default.my_iceberg_table"
+ assert config.format == "iceberg"
+ assert config.storage_type is None
+
+ def test_iceberg_is_table_provider(self):
+ config = DataSourceConfig(
+ conn_id="iceberg_conn", table_name="my_table", format="iceberg",
db_name="default"
+ )
+ assert config.is_table_provider is True
+
+ def test_non_iceberg_is_not_table_provider(self):
+ config = DataSourceConfig(
+ conn_id="test", uri="s3://bucket/path", table_name="my_table",
format="parquet"
+ )
+ assert config.is_table_provider is False
+
class TestConnectionConfig:
def test_connection_config_creation(self):