This is an automated email from the ASF dual-hosted git repository.

gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fe284c77837 AIP-99: Add AnalyticsOperator (#62232)
fe284c77837 is described below

commit fe284c7783751be6b4549598f402c18b3c27f745
Author: GPK <[email protected]>
AuthorDate: Sat Feb 28 03:01:11 2026 +0000

    AIP-99: Add AnalyticsOperator (#62232)
    
    * Add analytics operator
    
    * Add analytics operator
    
    * Fix tests
    
    * Add docs
    
    * update params
    
    * update datafusion version
    
    * Fix selective checks
    
    * Fix tests
    
    * Fix mypy checks
    
    * move examples to example_dag folder
    
    * Update docs
    
    * Update docs
    
    * Update datasource config to support options parameter
    
    * Update docstring
    
    * Resolve comments
    
    * Resolve comments
    
    * Resolve comments
    
    * fixup tests
    
    * Move analytics operator to common-sql
    
    * Fixup tests
    
    * Updated changes imports
    
    * Updated changes test paths
    
    * Resolve comments
    
    * Update endpoint in extras
    
    * Update dependency
    
    * Update dependency
---
 .../tests/unit/always/test_project_structure.py    |   2 +
 docs/spelling_wordlist.txt                         |   3 +
 providers/common/sql/docs/index.rst                |   3 +-
 providers/common/sql/docs/operators.rst            |  65 ++++++
 providers/common/sql/provider.yaml                 |   1 +
 providers/common/sql/pyproject.toml                |  10 +-
 .../sql/src/airflow/providers/common/sql/config.py |  86 +++++++
 .../providers/common/sql/datafusion/__init__.py    |  16 ++
 .../providers/common/sql/datafusion/base.py        |  67 ++++++
 .../providers/common/sql/datafusion/engine.py      | 167 ++++++++++++++
 .../providers/common/sql/datafusion/exceptions.py  |  31 +++
 .../common/sql/datafusion/format_handlers.py       | 113 +++++++++
 .../sql/datafusion/object_storage_provider.py      |  87 +++++++
 .../providers/common/sql/example_dags/__init__.py  |  16 ++
 .../common/sql/example_dags/example_analytics.py   |  58 +++++
 .../providers/common/sql/get_provider_info.py      |   1 +
 .../providers/common/sql/operators/analytics.py    | 161 +++++++++++++
 .../tests/unit/common/sql/datafusion/__init__.py   |  16 ++
 .../unit/common/sql/datafusion/test_engine.py      | 253 +++++++++++++++++++++
 .../common/sql/datafusion/test_format_handlers.py  |  82 +++++++
 .../sql/datafusion/test_object_storage_provider.py |  74 ++++++
 .../unit/common/sql/operators/test_analytics.py    | 159 +++++++++++++
 .../sql/tests/unit/common/sql/test_config.py       |  79 +++++++
 23 files changed, 1548 insertions(+), 2 deletions(-)

diff --git a/airflow-core/tests/unit/always/test_project_structure.py 
b/airflow-core/tests/unit/always/test_project_structure.py
index fb55ed7e9c3..ffd7aa53c5c 100644
--- a/airflow-core/tests/unit/always/test_project_structure.py
+++ b/airflow-core/tests/unit/always/test_project_structure.py
@@ -98,6 +98,8 @@ class TestProjectStructure:
             
"providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_delete_from.py",
             
"providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_k8s_hashlib_wrapper.py",
             
"providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py",
+            
"providers/common/sql/tests/unit/common/sql/datafusion/test_base.py",
+            
"providers/common/sql/tests/unit/common/sql/datafusion/test_exceptions.py",
             
"providers/common/compat/tests/unit/common/compat/lineage/test_entities.py",
             
"providers/common/compat/tests/unit/common/compat/standard/test_operators.py",
             
"providers/common/compat/tests/unit/common/compat/standard/test_triggers.py",
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 0cc5d0b6fd0..6b157586513 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -453,8 +453,10 @@ datasetId
 Datasets
 datasets
 datasource
+DataSourceConfig
 Datastore
 datastore
+datastores
 Datasync
 datasync
 datatransfer
@@ -2062,6 +2064,7 @@ wape
 warmup
 Wasb
 wasb
+wasn
 weaviate
 WebClient
 webhdfs
diff --git a/providers/common/sql/docs/index.rst 
b/providers/common/sql/docs/index.rst
index bd67381dd1b..28cbd000dd5 100644
--- a/providers/common/sql/docs/index.rst
+++ b/providers/common/sql/docs/index.rst
@@ -119,12 +119,13 @@ You can install such cross-provider dependencies when 
installing from PyPI. For
 
 .. code-block:: bash
 
-    pip install apache-airflow-providers-common-sql[common.compat]
+    pip install apache-airflow-providers-common-sql[amazon]
 
 
 
==================================================================================================================
  =================
 Dependent package                                                              
                                     Extra
 
==================================================================================================================
  =================
+`apache-airflow-providers-amazon 
<https://airflow.apache.org/docs/apache-airflow-providers-amazon>`_             
   ``amazon``
 `apache-airflow-providers-common-compat 
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_  
``common.compat``
 `apache-airflow-providers-openlineage 
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_      
``openlineage``
 
==================================================================================================================
  =================
diff --git a/providers/common/sql/docs/operators.rst 
b/providers/common/sql/docs/operators.rst
index 1766d78fbed..97832e74854 100644
--- a/providers/common/sql/docs/operators.rst
+++ b/providers/common/sql/docs/operators.rst
@@ -252,3 +252,68 @@ between two connections.
     :dedent: 4
     :start-after: [START howto_operator_generic_transfer]
     :end-before: [END howto_operator_generic_transfer]
+
+Analytics Operator
+~~~~~~~~~~~~~~~~~~
+
+The Analytics operator is designed to run analytic queries on data stored in 
various datastores. It is a generic operator that can query data in S3, GCS, 
Azure, and Local File System.
+
+When to Use Analytics Operator
+------------------------------
+
+The Analytics Operator is ideal for performing efficient, high-performance 
analytics on large volumes of data across various storage systems. Under the 
hood, it uses Apache DataFusion, a high-performance, extensible query engine 
for Apache Arrow, which enables fast SQL queries on various data formats and 
storage systems. DataFusion is chosen for its ability to handle large-scale 
data processing on a single node, providing low-latency analytics without the 
need for a full database setup a [...]
+
+
+Supported Storage Systems
+-------------------------
+- S3
+- Local File System
+
+.. note::
+   GCS, Azure, HTTP, Delta, Iceberg are not yet supported but will be added in 
the future.
+
+
+
+Supported File Formats
+----------------------
+- Parquet
+- CSV
+- Avro
+
+.. _howto/operator:AnalyticsOperator:
+
+Use the 
:class:`~airflow.providers.common.sql.operators.analytics.AnalyticsOperator` to 
run analytic queries.
+
+Parameters
+----------
+* ``datasource_configs`` (list[DataSourceConfig], required): List of 
datasource configurations
+* ``queries`` (list[str], required): List of SQL queries to run on the data
+* ``max_rows_check`` (int, optional): Maximum number of rows to check for each 
query. Default is 100. If any query returns more than this number of rows, it 
will be skipped in the results returned by the operator. This is to prevent 
returning too many rows in the results which can cause xcom rendering issues in 
Airflow UI.
+* ``engine`` (DataFusionEngine, optional): Query engine to use. Default is 
"datafusion". Currently, only "datafusion" is supported.
+* ``result_output_format`` (str, optional): Output format for the results. 
Default is ``tabulate``. Supported formats are ``tabulate``, ``json``.
+
+DataSourceConfig Parameters
+---------------------------
+
+* ``conn_id`` (str, required): Connection ID of the storage. e.g: 
"aws_default" for S3.
+* ``uri`` (str, required): URI of the datasource.
+* ``format`` (str, required): Format of the data.
+* ``table_name`` (str, required): Name of the table. Note: This name can be 
any identifier and should match the table name used in the SQL queries.
+* ``storage_type`` (StorageType, optional): Type of storage. Default is None. 
If not provided, it will be inferred from the URI.
+* ``options`` (dict, optional): Additional options for the datasource. eg: if 
the datasource is partitioned, you can provide partitioning information in the 
options, e.g: ``{"table_partition_cols": [("year", "integer")]}``
+
+S3 Storage
+----------
+.. exampleinclude:: 
/../../sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_analytics_operator_with_s3]
+    :end-before: [END howto_analytics_operator_with_s3]
+
+Local File System Storage
+-------------------------
+.. exampleinclude:: 
/../../sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_analytics_operator_with_local]
+    :end-before: [END howto_analytics_operator_with_local]
diff --git a/providers/common/sql/provider.yaml 
b/providers/common/sql/provider.yaml
index b3e7f8cf686..aca5da531af 100644
--- a/providers/common/sql/provider.yaml
+++ b/providers/common/sql/provider.yaml
@@ -103,6 +103,7 @@ operators:
     python-modules:
       - airflow.providers.common.sql.operators.sql
       - airflow.providers.common.sql.operators.generic_transfer
+      - airflow.providers.common.sql.operators.analytics
 
 dialects:
   - dialect-type: default
diff --git a/providers/common/sql/pyproject.toml 
b/providers/common/sql/pyproject.toml
index fb5b5fb9f60..0b5cd1d0f86 100644
--- a/providers/common/sql/pyproject.toml
+++ b/providers/common/sql/pyproject.toml
@@ -85,12 +85,19 @@ dependencies = [
 "sqlalchemy" = [
     "sqlalchemy>=1.4.54",
 ]
+"amazon" = [
+    "apache-airflow-providers-amazon"
+]
+"datafusion" =[
+    "datafusion>=50.0.0",
+]
 
 [dependency-groups]
 dev = [
     "apache-airflow",
     "apache-airflow-task-sdk",
     "apache-airflow-devel-common",
+    "apache-airflow-providers-amazon",
     "apache-airflow-providers-common-compat",
     "apache-airflow-providers-openlineage",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
@@ -100,7 +107,8 @@ dev = [
     "apache-airflow-providers-postgres",
     "apache-airflow-providers-odbc",
     "apache-airflow-providers-sqlite",
-    "apache-airflow-providers-common-sql[sqlalchemy]"
+    "apache-airflow-providers-common-sql[sqlalchemy]",
+    "datafusion>=50.0.0"
 ]
 
 # To build docs:
diff --git a/providers/common/sql/src/airflow/providers/common/sql/config.py 
b/providers/common/sql/src/airflow/providers/common/sql/config.py
new file mode 100644
index 00000000000..f16ddc5e50d
--- /dev/null
+++ b/providers/common/sql/src/airflow/providers/common/sql/config.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Any
+
+
+@dataclass(frozen=True)
+class ConnectionConfig:
+    """Configuration for datafusion object store connections."""
+
+    conn_id: str
+    credentials: dict[str, Any] = field(default_factory=dict)
+    extra_config: dict[str, Any] = field(default_factory=dict)
+
+
+class FormatType(str, Enum):
+    """Supported data formats."""
+
+    PARQUET = "parquet"
+    CSV = "csv"
+    AVRO = "avro"
+
+
+class StorageType(str, Enum):
+    """Storage types for Data Fusion."""
+
+    S3 = "s3"
+    LOCAL = "local"
+
+
+@dataclass
+class DataSourceConfig:
+    """
+    Configuration for an input data source.
+
+    :param conn_id: The connection ID to use for accessing the data source.
+    :param uri: The URI of the data source (e.g., file path, S3 bucket, etc.).
+    :param format: The format of the data (e.g., 'parquet', 'csv').
+    :param table_name: The name of the table if applicable.
+    :param schema: A dictionary mapping column names to their types.
+    :param db_name: The database name if applicable.
+    :param storage_type: The type of storage (automatically inferred from URI).
+    :param options: Additional options for the data source. eg: you can set 
partition columns to any datasource
+        that will be set in while registering the data
+    """
+
+    conn_id: str
+    uri: str
+    format: str | None = None
+    table_name: str | None = None
+    storage_type: StorageType | None = None
+    options: dict[str, Any] = field(default_factory=dict)
+
+    def __post_init__(self):
+
+        if self.storage_type is None:
+            self.storage_type = self._extract_storage_type
+
+        if self.storage_type is not None and self.table_name is None:
+            raise ValueError("Table name must be provided for storage type")
+
+    @property
+    def _extract_storage_type(self) -> StorageType | None:
+        """Extract storage type."""
+        if self.uri.startswith("s3://"):
+            return StorageType.S3
+        if self.uri.startswith("file://"):
+            return StorageType.LOCAL
+        raise ValueError(f"Unsupported storage type for URI: {self.uri}")
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/__init__.py 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
new file mode 100644
index 00000000000..496d4139835
--- /dev/null
+++ b/providers/common/sql/src/airflow/providers/common/sql/datafusion/base.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from typing import TYPE_CHECKING, Any
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from datafusion import SessionContext
+
+    from airflow.providers.common.sql.config import ConnectionConfig, 
FormatType, StorageType
+
+
+class ObjectStorageProvider(LoggingMixin, ABC):
+    """Abstract base class for object storage providers."""
+
+    @property
+    def get_storage_type(self) -> StorageType:
+        """Return storage type handled by this provider (e.g., 's3', 'gcs', 
'local')."""
+        raise NotImplementedError
+
+    @abstractmethod
+    def create_object_store(self, path: str, connection_config: 
ConnectionConfig | None = None) -> Any:
+        """Create and return a DataFusion object store instance."""
+        raise NotImplementedError
+
+    @abstractmethod
+    def get_scheme(self) -> str:
+        """Return URL scheme for this storage type (e.g., 's3://', 'gs://')."""
+        raise NotImplementedError
+
+    def get_bucket(self, path: str) -> str | None:
+        """Extract the bucket name from the given path."""
+        if path and path.startswith(self.get_scheme()):
+            path_parts = path[len(self.get_scheme()) :].split("/", 1)
+            return path_parts[0]
+        return None
+
+
+class FormatHandler(LoggingMixin, ABC):
+    """Abstract base class for format handlers."""
+
+    @property
+    def get_format(self) -> FormatType:
+        """Return file format type."""
+        raise NotImplementedError
+
+    @abstractmethod
+    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str) -> None:
+        """Register data source format."""
+        raise NotImplementedError
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
new file mode 100644
index 00000000000..498655e46e5
--- /dev/null
+++ b/providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from datafusion import SessionContext
+
+from airflow.providers.common.compat.sdk import BaseHook, Connection
+from airflow.providers.common.sql.config import ConnectionConfig, 
DataSourceConfig, StorageType
+from airflow.providers.common.sql.datafusion.exceptions import (
+    ObjectStoreCreationException,
+    QueryExecutionException,
+)
+from airflow.providers.common.sql.datafusion.format_handlers import 
get_format_handler
+from airflow.providers.common.sql.datafusion.object_storage_provider import 
get_object_storage_provider
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DataFusionEngine(LoggingMixin):
+    """Apache DataFusion engine."""
+
+    def __init__(self):
+        super().__init__()
+        # TODO: session context has additional parameters via SessionConfig 
see what's possible we can use Possible via DataFusionHook ?
+        self.df_ctx = SessionContext()
+        self.registered_tables: dict[str, str] = {}
+
+    @property
+    def session_context(self) -> SessionContext:
+        """Return the session context."""
+        return self.df_ctx
+
+    def register_datasource(self, datasource_config: DataSourceConfig):
+        """Register a datasource with the datafusion engine."""
+        if not isinstance(datasource_config, DataSourceConfig):
+            raise ValueError("datasource_config must be of type 
DataSourceConfig")
+
+        if datasource_config.storage_type == StorageType.LOCAL:
+            connection_config = None
+        else:
+            connection_config = 
self._get_connection_config(datasource_config.conn_id)
+
+        self._register_object_store(datasource_config, connection_config)
+        self._register_data_source_format(datasource_config)
+
+    def _register_object_store(
+        self, datasource_config: DataSourceConfig, connection_config: 
ConnectionConfig | None
+    ):
+        """Register object stores."""
+        if TYPE_CHECKING:
+            assert datasource_config.storage_type is not None
+
+        try:
+            storage_provider = 
get_object_storage_provider(datasource_config.storage_type)
+            object_store = storage_provider.create_object_store(
+                datasource_config.uri, connection_config=connection_config
+            )
+            schema = storage_provider.get_scheme()
+            self.session_context.register_object_store(schema=schema, 
store=object_store)
+            self.log.info("Registered object store for schema: %s", schema)
+        except Exception as e:
+            raise ObjectStoreCreationException(
+                f"Error while creating object store for 
{datasource_config.storage_type}: {e}"
+            )
+
+    def _register_data_source_format(self, datasource_config: 
DataSourceConfig):
+        """Register data source format."""
+        if TYPE_CHECKING:
+            assert datasource_config.table_name is not None
+            assert datasource_config.format is not None
+
+        if datasource_config.table_name in self.registered_tables:
+            raise ValueError(
+                f"Table {datasource_config.table_name} already registered for 
{self.registered_tables[datasource_config.table_name]}, please choose different 
name"
+            )
+
+        format_cls = get_format_handler(datasource_config.format, 
datasource_config.options)
+        format_cls.register_data_source_format(
+            self.session_context, datasource_config.table_name, 
datasource_config.uri
+        )
+        self.registered_tables[datasource_config.table_name] = 
datasource_config.uri
+        self.log.info(
+            "Registered data source format %s for table: %s",
+            datasource_config.format,
+            datasource_config.table_name,
+        )
+
+    def execute_query(self, query: str) -> dict[str, list[Any]]:
+        """Execute a query and return the result as a dictionary."""
+        try:
+            self.log.info("Executing query: %s", query)
+            df = self.session_context.sql(query)
+            return df.to_pydict()
+        except Exception as e:
+            raise QueryExecutionException(f"Error while executing query: {e}")
+
+    def _get_connection_config(self, conn_id: str) -> ConnectionConfig:
+
+        airflow_conn = BaseHook.get_connection(conn_id)
+
+        credentials, extra_config = self._get_credentials(airflow_conn)
+
+        return ConnectionConfig(
+            conn_id=airflow_conn.conn_id,
+            credentials=credentials,
+            extra_config=extra_config,
+        )
+
+    def _get_credentials(self, conn: Connection) -> tuple[dict[str, Any], 
dict[str, Any]]:
+
+        credentials = {}
+        extra_config = {}
+
+        def _fetch_extra_configs(keys: list[str]) -> dict[str, Any]:
+            conf = {}
+            extra_dejson = conn.extra_dejson
+            for key in keys:
+                if key in extra_dejson:
+                    conf[key] = conn.extra_dejson[key]
+            return conf
+
+        match conn.conn_type:
+            case "aws":
+                try:
+                    from airflow.providers.amazon.aws.hooks.base_aws import 
AwsGenericHook
+                except ImportError:
+                    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+                    raise AirflowOptionalProviderFeatureException(
+                        "Failed to import AwsGenericHook. To use the S3 
storage functionality, please install the "
+                        "apache-airflow-providers-amazon package."
+                    )
+                aws_hook: AwsGenericHook = 
AwsGenericHook(aws_conn_id=conn.conn_id, client_type="s3")
+                creds = aws_hook.get_credentials()
+                credentials.update(
+                    {
+                        "access_key_id": conn.login or creds.access_key,
+                        "secret_access_key": conn.password or creds.secret_key,
+                        "session_token": creds.token if creds.token else None,
+                    }
+                )
+                credentials = self._remove_none_values(credentials)
+                extra_config = _fetch_extra_configs(["region", "endpoint"])
+
+            case _:
+                raise ValueError(f"Unknown connection type {conn.conn_type}")
+        return credentials, extra_config
+
+    @staticmethod
+    def _remove_none_values(params: dict[str, Any]) -> dict[str, Any]:
+        """Filter out None values from the dictionary."""
+        return {k: v for k, v in params.items() if v is not None}
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
new file mode 100644
index 00000000000..dea1e5c219b
--- /dev/null
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/exceptions.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.common.compat.sdk import AirflowException
+
+
+class ObjectStoreCreationException(AirflowException):
+    """Error while creating a DataFusion object store."""
+
+
+class FileFormatRegistrationException(AirflowException):
+    """Error while registering file format."""
+
+
+class QueryExecutionException(AirflowException):
+    """Error while executing query."""
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
new file mode 100644
index 00000000000..4dd0f461456
--- /dev/null
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/format_handlers.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from airflow.providers.common.sql.config import FormatType
+from airflow.providers.common.sql.datafusion.base import FormatHandler
+from airflow.providers.common.sql.datafusion.exceptions import 
FileFormatRegistrationException
+
+if TYPE_CHECKING:
+    from datafusion import SessionContext
+
+
+class ParquetFormatHandler(FormatHandler):
+    """
+    Parquet format handler.
+
+    :param options: Additional options for the Parquet format.
+        
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_parquet
+    """
+
+    def __init__(self, options: dict[str, Any] | None = None):
+        self.options = options or {}
+
+    @property
+    def get_format(self) -> FormatType:
+        """Return the format type."""
+        return FormatType.PARQUET
+
+    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str):
+        """Register a data source format."""
+        try:
+            ctx.register_parquet(table_name, path, **self.options)
+        except Exception as e:
+            raise FileFormatRegistrationException(f"Failed to register Parquet 
data source: {e}")
+
+
+class CsvFormatHandler(FormatHandler):
+    """
+    CSV format handler.
+
+    :param options: Additional options for the CSV format.
+        
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_csv
+    """
+
+    def __init__(self, options: dict[str, Any] | None = None):
+        self.options = options or {}
+
+    @property
+    def get_format(self) -> FormatType:
+        """Return the format type."""
+        return FormatType.CSV
+
+    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str):
+        """Register a data source format."""
+        try:
+            ctx.register_csv(table_name, path, **self.options)
+        except Exception as e:
+            raise FileFormatRegistrationException(f"Failed to register csv 
data source: {e}")
+
+
+class AvroFormatHandler(FormatHandler):
+    """
+    Avro format handler.
+
+    :param options: Additional options for the Avro format.
+        
https://datafusion.apache.org/python/autoapi/datafusion/context/index.html#datafusion.context.SessionContext.register_avro
+    """
+
+    def __init__(self, options: dict[str, Any] | None = None):
+        self.options = options or {}
+
+    @property
+    def get_format(self) -> FormatType:
+        """Return the format type."""
+        return FormatType.AVRO
+
+    def register_data_source_format(self, ctx: SessionContext, table_name: 
str, path: str) -> None:
+        """Register a data source format."""
+        try:
+            ctx.register_avro(table_name, path, **self.options)
+        except Exception as e:
+            raise FileFormatRegistrationException(f"Failed to register Avro 
data source: {e}")
+
+
+def get_format_handler(format_type: str, options: dict[str, Any] | None = 
None) -> FormatHandler:
+    """Get a format handler based on the format type."""
+    format_type = format_type.lower()
+
+    match format_type:
+        case "parquet":
+            return ParquetFormatHandler(options)
+        case "csv":
+            return CsvFormatHandler(options)
+        case "avro":
+            return AvroFormatHandler(options)
+        case _:
+            raise ValueError(f"Unsupported format: {format_type}")
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py
 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py
new file mode 100644
index 00000000000..1ce917a1bb2
--- /dev/null
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datafusion.object_store import AmazonS3, LocalFileSystem
+
+from airflow.providers.common.sql.config import ConnectionConfig, StorageType
+from airflow.providers.common.sql.datafusion.base import ObjectStorageProvider
+from airflow.providers.common.sql.datafusion.exceptions import 
ObjectStoreCreationException
+
+
+class S3ObjectStorageProvider(ObjectStorageProvider):
+    """S3 Object Storage Provider using DataFusion's AmazonS3."""
+
+    @property
+    def get_storage_type(self) -> StorageType:
+        """Return the storage type."""
+        return StorageType.S3
+
+    def create_object_store(self, path: str, connection_config: 
ConnectionConfig | None = None):
+        """Create an S3 object store using DataFusion's AmazonS3."""
+        if connection_config is None:
+            raise ValueError("connection_config must be provided for %s", 
self.get_storage_type)
+
+        try:
+            credentials = connection_config.credentials
+            bucket = self.get_bucket(path)
+
+            s3_store = AmazonS3(**credentials, 
**connection_config.extra_config, bucket_name=bucket)
+            self.log.info("Created S3 object store for bucket %s", bucket)
+
+            return s3_store
+
+        except Exception as e:
+            raise ObjectStoreCreationException(f"Failed to create S3 object 
store: {e}")
+
+    def get_scheme(self) -> str:
+        """Return the scheme for S3."""
+        return "s3://"
+
+
+class LocalObjectStorageProvider(ObjectStorageProvider):
+    """Local Object Storage Provider using DataFusion's LocalFileSystem."""
+
+    @property
+    def get_storage_type(self) -> StorageType:
+        """Return the storage type."""
+        return StorageType.LOCAL
+
+    def create_object_store(self, path: str, connection_config: 
ConnectionConfig | None = None):
+        """Create a Local object store."""
+        return LocalFileSystem()
+
+    def get_scheme(self) -> str:
+        """Return the scheme to a Local file system."""
+        return "file://"
+
+
+def get_object_storage_provider(storage_type: StorageType) -> 
ObjectStorageProvider:
+    """Get an object storage provider based on the storage type."""
+    # TODO: Add support for GCS, Azure, HTTP: 
https://datafusion.apache.org/python/autoapi/datafusion/object_store/index.html
+    providers: dict[StorageType, type] = {
+        StorageType.S3: S3ObjectStorageProvider,
+        StorageType.LOCAL: LocalObjectStorageProvider,
+    }
+
+    if storage_type not in providers:
+        raise ValueError(
+            f"Unsupported storage type: {storage_type}. Supported types: 
{list(providers.keys())}"
+        )
+
+    provider_class = providers[storage_type]
+    return provider_class()
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/example_dags/__init__.py
 
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
 
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
new file mode 100644
index 00000000000..4871757b3bb
--- /dev/null
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/example_dags/example_analytics.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+
+from airflow.providers.common.sql.config import DataSourceConfig
+from airflow.providers.common.sql.operators.analytics import AnalyticsOperator
+from airflow.sdk import DAG
+
+datasource_config_s3 = DataSourceConfig(
+    conn_id="aws_default", table_name="users_data", uri="s3://bucket/path/", 
format="parquet"
+)
+
+datasource_config_local = DataSourceConfig(
+    conn_id="", table_name="users_data", uri="file:///path/to/", 
format="parquet"
+)
+
+# Please replace uri with appropriate value
+
+with DAG(
+    dag_id="example_analytics",
+    schedule=datetime.timedelta(hours=4),
+    start_date=datetime.datetime(2021, 1, 1),
+    catchup=False,
+    tags=["analytics", "common-sql"],
+) as dag:
+    # [START howto_analytics_operator_with_s3]
+    analytics_with_s3 = AnalyticsOperator(
+        task_id="analytics_with_s3",
+        datasource_configs=[datasource_config_s3],
+        queries=["SELECT * FROM users_data", "SELECT count(*) FROM 
users_data"],
+    )
+
+    # [END howto_analytics_operator_with_s3]
+
+    # [START howto_analytics_operator_with_local]
+    analytics_with_local = AnalyticsOperator(
+        task_id="analytics_with_local",
+        datasource_configs=[datasource_config_local],
+        queries=["SELECT * FROM users_data", "SELECT count(*) FROM 
users_data"],
+    )
+    analytics_with_s3 >> analytics_with_local
+    # [END howto_analytics_operator_with_local]
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/get_provider_info.py 
b/providers/common/sql/src/airflow/providers/common/sql/get_provider_info.py
index e2f8332bd95..ca5892917f6 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/get_provider_info.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/get_provider_info.py
@@ -41,6 +41,7 @@ def get_provider_info():
                 "python-modules": [
                     "airflow.providers.common.sql.operators.sql",
                     "airflow.providers.common.sql.operators.generic_transfer",
+                    "airflow.providers.common.sql.operators.analytics",
                 ],
             }
         ],
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/operators/analytics.py 
b/providers/common/sql/src/airflow/providers/common/sql/operators/analytics.py
new file mode 100644
index 00000000000..0ed922ed986
--- /dev/null
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/operators/analytics.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from collections.abc import Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Literal
+
+from airflow.providers.common.compat.sdk import BaseOperator, Context
+from airflow.providers.common.sql.datafusion.engine import DataFusionEngine
+
+if TYPE_CHECKING:
+    from airflow.providers.common.sql.config import DataSourceConfig
+
+
+class AnalyticsOperator(BaseOperator):
+    """
+    Operator to run queries on various datasource's stored in object stores 
like S3, GCS, Azure, etc.
+
+    :param datasource_configs: List of datasource configurations to register.
+    :param queries: List of SQL queries to execute.
+    :param max_rows_check: Maximum number of rows allowed in query results. 
Queries exceeding this will be skipped.
+    :param engine: Optional DataFusion engine instance.
+    :param result_output_format: List of output formats for results. 
Supported: 'tabulate', 'json'. Default is 'tabulate'.
+    """
+
+    template_fields: Sequence[str] = (
+        "datasource_configs",
+        "queries",
+        "max_rows_check",
+        "result_output_format",
+    )
+
+    def __init__(
+        self,
+        datasource_configs: list[DataSourceConfig],
+        queries: list[str],
+        max_rows_check: int = 100,
+        engine: DataFusionEngine | None = None,
+        result_output_format: Literal["tabulate", "json"] = "tabulate",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.datasource_configs = datasource_configs
+        self.queries = queries
+        self.engine = engine
+        self.max_rows_check = max_rows_check
+        self.result_output_format = result_output_format
+
+    @cached_property
+    def _df_engine(self):
+        if self.engine is None:
+            return DataFusionEngine()
+        return self.engine
+
+    def execute(self, context: Context) -> str:
+
+        results = []
+        for datasource_config in self.datasource_configs:
+            self._df_engine.register_datasource(datasource_config)
+
+        # TODO make it parallel as there is no dependency between queries
+        for query in self.queries:
+            result_dict = self._df_engine.execute_query(query)
+            results.append({"query": query, "data": result_dict})
+
+        match self.result_output_format:
+            case "tabulate":
+                return self._build_tabulate_output(results)
+            case "json":
+                return self._build_json_output(results)
+            case _:
+                raise ValueError(f"Unsupported output format: 
{self.result_output_format}")
+
+    def _is_result_too_large(self, result_dict: dict[str, Any]) -> tuple[bool, 
int]:
+        """Check if a result exceeds the max_rows_check limit."""
+        if not result_dict:
+            return False, 0
+        num_rows = len(next(iter(result_dict.values())))
+        max_rows_exceeded = num_rows > self.max_rows_check
+        if max_rows_exceeded:
+            self.log.warning(
+                "Query returned %s rows, exceeding max_rows_check (%s). 
Skipping result output as large datasets are unsuitable for return.",
+                num_rows,
+                self.max_rows_check,
+            )
+        return max_rows_exceeded, num_rows
+
+    def _build_tabulate_output(self, query_results: list[dict[str, Any]]) -> 
str:
+        from tabulate import tabulate
+
+        output_parts = []
+        for item in query_results:
+            query = item["query"]
+            result_dict = item["data"]
+            too_large, row_count = self._is_result_too_large(result_dict)
+
+            if too_large:
+                output_parts.append(
+                    f"\n### Results: {query}\n\n"
+                    f"**Skipped**: {row_count} rows exceed max_rows_check 
({self.max_rows_check})\n\n"
+                    f"{'-' * 40}\n"
+                )
+                continue
+
+            table_str = tabulate(
+                self._get_rows(result_dict, row_count),
+                headers="keys",
+                tablefmt="github",
+                showindex=True,
+            )
+            output_parts.append(f"\n### Results: 
{query}\n\n{table_str}\n\n{'-' * 40}\n")
+
+        return "".join(output_parts)
+
+    @staticmethod
+    def _get_rows(result_dict: dict[str, Any], row_count: int) -> 
list[dict[str, Any]]:
+        return [{key: result_dict[key][i] for key in result_dict} for i in 
range(row_count)]
+
+    def _build_json_output(self, query_results: list[dict[str, Any]]) -> str:
+        json_results = []
+
+        for item in query_results:
+            query = item["query"]
+            result_dict = item["data"]
+            max_rows_exceeded, row_count = 
self._is_result_too_large(result_dict)
+
+            if max_rows_exceeded:
+                json_results.append(
+                    {
+                        "query": query,
+                        "status": "skipped_too_large",
+                        "row_count": row_count,
+                        "max_allowed": self.max_rows_check,
+                    }
+                )
+                continue
+
+            json_results.append(
+                {
+                    "query": query,
+                    "data": self._get_rows(result_dict, row_count),
+                }
+            )
+
+        return json.dumps(json_results, default=str)
diff --git a/providers/common/sql/tests/unit/common/sql/datafusion/__init__.py 
b/providers/common/sql/tests/unit/common/sql/datafusion/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/common/sql/tests/unit/common/sql/datafusion/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/providers/common/sql/tests/unit/common/sql/datafusion/test_engine.py 
b/providers/common/sql/tests/unit/common/sql/datafusion/test_engine.py
new file mode 100644
index 00000000000..ef7413ca89d
--- /dev/null
+++ b/providers/common/sql/tests/unit/common/sql/datafusion/test_engine.py
@@ -0,0 +1,253 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+import tempfile
+from unittest.mock import MagicMock, patch
+
+import pytest
+from datafusion import SessionContext
+
+from airflow.models import Connection
+from airflow.providers.common.sql.config import ConnectionConfig, 
DataSourceConfig
+from airflow.providers.common.sql.datafusion.base import ObjectStorageProvider
+from airflow.providers.common.sql.datafusion.engine import DataFusionEngine
+from airflow.providers.common.sql.datafusion.exceptions import (
+    ObjectStoreCreationException,
+    QueryExecutionException,
+)
+
+TEST_CONNECTION_CONFIG = ConnectionConfig(
+    conn_id="aws_default",
+    credentials={
+        "access_key_id": "test",
+        "secret_access_key": "test",
+        "session_token": None,
+    },
+    extra_config={"region_name": "us-east-1"},
+)
+
+
+class TestDataFusionEngine:
+    @pytest.fixture(autouse=True)
+    def setup_connections(self, create_connection_without_db):
+        create_connection_without_db(
+            Connection(
+                conn_id="aws_default",
+                conn_type="aws",
+                login="fake_id",
+                password="fake_secret",
+                extra='{"region": "us-east-1"}',
+            )
+        )
+
+    def test_init(self):
+        engine = DataFusionEngine()
+        assert engine.df_ctx is not None
+        assert engine.registered_tables == {}
+
+    def test_session_context_property(self):
+        engine = DataFusionEngine()
+        assert isinstance(engine.session_context, SessionContext)
+        assert engine.session_context is engine.df_ctx
+
+    def test_register_datasource_invalid_config(self):
+        engine = DataFusionEngine()
+        with pytest.raises(ValueError, match="datasource_config must be of 
type DataSourceConfig"):
+            engine.register_datasource("invalid")
+
+    @pytest.mark.parametrize(
+        ("storage_type", "format", "scheme"),
+        [("s3", "parquet", "s3"), ("s3", "csv", "s3"), ("s3", "avro", "s3")],
+    )
+    
@patch("airflow.providers.common.sql.datafusion.engine.get_object_storage_provider",
 autospec=True)
+    @patch.object(DataFusionEngine, "_get_connection_config")
+    def test_register_datasource_success(self, mock_get_conn, mock_factory, 
storage_type, format, scheme):
+        mock_get_conn.return_value = TEST_CONNECTION_CONFIG
+        mock_provider = MagicMock(spec=ObjectStorageProvider)
+        mock_store = MagicMock()
+        mock_provider.create_object_store.return_value = mock_store
+        mock_provider.get_scheme.return_value = scheme
+        mock_factory.return_value = mock_provider
+
+        engine = DataFusionEngine()
+
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default", table_name="test_table", 
uri=f"{scheme}://bucket/path", format=format
+        )
+
+        engine.df_ctx = MagicMock(spec=SessionContext)
+
+        engine.register_datasource(datasource_config)
+
+        mock_factory.assert_called_once()
+        mock_provider.create_object_store.assert_called_once_with(
+            f"{scheme}://bucket/path", 
connection_config=mock_get_conn.return_value
+        )
+        
engine.df_ctx.register_object_store.assert_called_once_with(schema=scheme, 
store=mock_store)
+
+        if format == "parquet":
+            
engine.df_ctx.register_parquet.assert_called_once_with("test_table", 
f"{scheme}://bucket/path")
+        elif format == "csv":
+            engine.df_ctx.register_csv.assert_called_once_with("test_table", 
f"{scheme}://bucket/path")
+        elif format == "avro":
+            engine.df_ctx.register_avro.assert_called_once_with("test_table", 
f"{scheme}://bucket/path")
+
+        assert engine.registered_tables == {"test_table": 
f"{scheme}://bucket/path"}
+
+    
@patch("airflow.providers.common.sql.datafusion.engine.get_object_storage_provider",
 autospec=True)
+    @patch.object(DataFusionEngine, "_get_connection_config")
+    def test_register_datasource_object_store_exception(self, mock_get_conn, 
mock_factory):
+        mock_get_conn.return_value = TEST_CONNECTION_CONFIG
+        mock_factory.side_effect = Exception("Provider error")
+
+        engine = DataFusionEngine()
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default", table_name="test_table", 
uri="s3://bucket/path", format="parquet"
+        )
+
+        with pytest.raises(ObjectStoreCreationException, match="Error while 
creating object store"):
+            engine.register_datasource(datasource_config)
+
+    @patch.object(DataFusionEngine, "_get_connection_config")
+    def test_register_datasource_duplicate_table(self, mock_get_conn):
+        mock_get_conn.return_value = TEST_CONNECTION_CONFIG
+        engine = DataFusionEngine()
+        engine.registered_tables["test_table"] = "s3://old/path"
+
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default", table_name="test_table", 
uri="s3://new/path", format="parquet"
+        )
+
+        with patch.object(engine, "_register_object_store"):
+            with pytest.raises(ValueError, match="Table test_table already 
registered"):
+                engine.register_datasource(datasource_config)
+
+    def test_execute_query_success(self):
+        engine = DataFusionEngine()
+        engine.df_ctx = MagicMock(spec=SessionContext)
+        mock_df = MagicMock()
+        mock_df.to_pydict.return_value = {"col1": [1, 2]}
+        engine.df_ctx.sql.return_value = mock_df
+
+        result = engine.execute_query("SELECT * FROM test_table")
+
+        engine.df_ctx.sql.assert_called_once_with("SELECT * FROM test_table")
+        assert result == {"col1": [1, 2]}
+
+    def test_execute_query_failure(self):
+        engine = DataFusionEngine()
+        engine.df_ctx = MagicMock(spec=SessionContext)
+        engine.df_ctx.sql.side_effect = Exception("SQL Error")
+
+        with pytest.raises(QueryExecutionException, match="Error while 
executing query"):
+            engine.execute_query("SELECT * FROM test_table")
+
+    @patch.object(DataFusionEngine, "_get_connection_config")
+    def test_execute_query_with_local_csv(self, mock_get_conn):
+        mock_get_conn.return_value = None
+
+        with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", 
delete=False) as f:
+            f.write("name,age\nAlice,30\nBob,25\n")
+            csv_path = f.name
+
+        try:
+            engine = DataFusionEngine()
+            datasource_config = DataSourceConfig(
+                table_name="test_csv",
+                uri=f"file://{csv_path}",
+                format="csv",
+                storage_type="local",
+                conn_id="",
+            )
+
+            engine.register_datasource(datasource_config)
+
+            result = engine.execute_query("SELECT * FROM test_csv ORDER BY 
name")
+
+            expected = {"name": ["Alice", "Bob"], "age": [30, 25]}
+            assert result == expected
+        finally:
+            os.unlink(csv_path)
+
+    
@patch("airflow.providers.common.sql.datafusion.engine.get_object_storage_provider",
 autospec=True)
+    @patch.object(DataFusionEngine, "_get_connection_config")
+    def test_register_datasource_with_options(self, mock_get_conn, 
mock_factory):
+        mock_get_conn.return_value = TEST_CONNECTION_CONFIG
+        mock_provider = MagicMock(spec=ObjectStorageProvider)
+        mock_store = MagicMock()
+        mock_provider.create_object_store.return_value = mock_store
+        mock_provider.get_scheme.return_value = "s3"
+        mock_factory.return_value = mock_provider
+
+        engine = DataFusionEngine()
+
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default",
+            table_name="test_table",
+            uri="s3://bucket/path/",
+            format="parquet",
+            options={"table_partition_cols": [("year", "integer"), ("month", 
"integer")]},
+        )
+
+        engine.df_ctx = MagicMock(spec=SessionContext)
+
+        engine.register_datasource(datasource_config)
+
+        mock_factory.assert_called_once()
+        mock_provider.create_object_store.assert_called_once_with(
+            "s3://bucket/path/", connection_config=mock_get_conn.return_value
+        )
+        
engine.df_ctx.register_object_store.assert_called_once_with(schema="s3", 
store=mock_store)
+
+        engine.df_ctx.register_parquet.assert_called_once_with(
+            "test_table",
+            "s3://bucket/path/",
+            table_partition_cols=[("year", "integer"), ("month", "integer")],
+        )
+
+        assert engine.registered_tables == {"test_table": "s3://bucket/path/"}
+
+    def test_remove_none_values(self):
+        result = DataFusionEngine._remove_none_values({"a": 1, "b": None, "c": 
"test", "d": None})
+        assert result == {"a": 1, "c": "test"}
+
+    def test_get_connection_config(self):
+
+        engine = DataFusionEngine()
+
+        result = engine._get_connection_config("aws_default")
+        expected = ConnectionConfig(
+            conn_id="aws_default",
+            credentials={
+                "access_key_id": "fake_id",
+                "secret_access_key": "fake_secret",
+            },
+            extra_config={"region": "us-east-1"},
+        )
+        assert result.conn_id == expected.conn_id
+        assert result.credentials == expected.credentials
+        assert result.extra_config == expected.extra_config
+
+    def test_get_credentials_unknown_type(self):
+        mock_conn = MagicMock()
+        mock_conn.conn_type = "dummy"
+        engine = DataFusionEngine()
+
+        with pytest.raises(ValueError, match="Unknown connection type dummy"):
+            engine._get_credentials(mock_conn)
diff --git 
a/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py 
b/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
new file mode 100644
index 00000000000..2518864b17a
--- /dev/null
+++ 
b/providers/common/sql/tests/unit/common/sql/datafusion/test_format_handlers.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.providers.common.sql.config import FormatType
+from airflow.providers.common.sql.datafusion.exceptions import 
FileFormatRegistrationException
+from airflow.providers.common.sql.datafusion.format_handlers import (
+    AvroFormatHandler,
+    CsvFormatHandler,
+    ParquetFormatHandler,
+    get_format_handler,
+)
+
+
+class TestFormatHandlers:
+    @pytest.fixture
+    def session_context_mock(self):
+        return MagicMock()
+
+    def test_parquet_handler_success(self, session_context_mock):
+        handler = ParquetFormatHandler(options={"key": "value"})
+        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+        session_context_mock.register_parquet.assert_called_once_with(
+            "table_name", "path/to/file", key="value"
+        )
+        assert handler.get_format == FormatType.PARQUET
+
+    def test_parquet_handler_failure(self, session_context_mock):
+        session_context_mock.register_parquet.side_effect = Exception("Error")
+        handler = ParquetFormatHandler()
+        with pytest.raises(FileFormatRegistrationException, match="Failed to 
register Parquet data source"):
+            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+
+    def test_csv_handler_success(self, session_context_mock):
+        handler = CsvFormatHandler(options={"delimiter": ","})
+        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+        
session_context_mock.register_csv.assert_called_once_with("table_name", 
"path/to/file", delimiter=",")
+        assert handler.get_format == FormatType.CSV
+
+    def test_csv_handler_failure(self, session_context_mock):
+        session_context_mock.register_csv.side_effect = Exception("Error")
+        handler = CsvFormatHandler()
+        with pytest.raises(FileFormatRegistrationException, match="Failed to 
register csv data source"):
+            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+
+    def test_avro_handler_success(self, session_context_mock):
+        handler = AvroFormatHandler(options={"key": "value"})
+        handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+        
session_context_mock.register_avro.assert_called_once_with("table_name", 
"path/to/file", key="value")
+        assert handler.get_format == FormatType.AVRO
+
+    def test_avro_handler_failure(self, session_context_mock):
+        session_context_mock.register_avro.side_effect = Exception("Error")
+        handler = AvroFormatHandler()
+        with pytest.raises(FileFormatRegistrationException, match="Failed to 
register Avro data source"):
+            handler.register_data_source_format(session_context_mock, 
"table_name", "path/to/file")
+
+    def test_get_format_handler(self):
+        assert isinstance(get_format_handler("parquet"), ParquetFormatHandler)
+        assert isinstance(get_format_handler("csv"), CsvFormatHandler)
+        assert isinstance(get_format_handler("avro"), AvroFormatHandler)
+
+        with pytest.raises(ValueError, match="Unsupported format"):
+            get_format_handler("invalid")
diff --git 
a/providers/common/sql/tests/unit/common/sql/datafusion/test_object_storage_provider.py
 
b/providers/common/sql/tests/unit/common/sql/datafusion/test_object_storage_provider.py
new file mode 100644
index 00000000000..9b0ff756a1f
--- /dev/null
+++ 
b/providers/common/sql/tests/unit/common/sql/datafusion/test_object_storage_provider.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.common.sql.config import ConnectionConfig, StorageType
+from airflow.providers.common.sql.datafusion.exceptions import 
ObjectStoreCreationException
+from airflow.providers.common.sql.datafusion.object_storage_provider import (
+    LocalObjectStorageProvider,
+    S3ObjectStorageProvider,
+    get_object_storage_provider,
+)
+
+
+class TestObjectStorageProvider:
+    
@patch("airflow.providers.common.sql.datafusion.object_storage_provider.AmazonS3")
+    def test_s3_provider_success(self, mock_s3):
+        provider = S3ObjectStorageProvider()
+        connection_config = ConnectionConfig(
+            conn_id="aws_default",
+            credentials={"access_key_id": "fake_key", "secret_access_key": 
"fake_secret"},
+        )
+
+        store = provider.create_object_store("s3://demo-data/path", 
connection_config)
+
+        mock_s3.assert_called_once_with(
+            access_key_id="fake_key", secret_access_key="fake_secret", 
bucket_name="demo-data"
+        )
+        assert store == mock_s3.return_value
+        assert provider.get_storage_type == StorageType.S3
+        assert provider.get_scheme() == "s3://"
+
+    def test_s3_provider_failure(self):
+        provider = S3ObjectStorageProvider()
+        connection_config = ConnectionConfig(conn_id="aws_default")
+
+        with patch(
+            
"airflow.providers.common.sql.datafusion.object_storage_provider.AmazonS3",
+            side_effect=Exception("Error"),
+        ):
+            with pytest.raises(ObjectStoreCreationException, match="Failed to 
create S3 object store"):
+                provider.create_object_store("s3://demo-data/path", 
connection_config)
+
+    
@patch("airflow.providers.common.sql.datafusion.object_storage_provider.LocalFileSystem")
+    def test_local_provider(self, mock_local):
+        provider = LocalObjectStorageProvider()
+        assert provider.get_storage_type == StorageType.LOCAL
+        assert provider.get_scheme() == "file://"
+        local_store = provider.create_object_store("file://path")
+        assert local_store == mock_local.return_value
+
+    def test_get_object_storage_provider(self):
+        assert isinstance(get_object_storage_provider(StorageType.S3), 
S3ObjectStorageProvider)
+        assert isinstance(get_object_storage_provider(StorageType.LOCAL), 
LocalObjectStorageProvider)
+
+        with pytest.raises(ValueError, match="Unsupported storage type"):
+            get_object_storage_provider("invalid")
diff --git 
a/providers/common/sql/tests/unit/common/sql/operators/test_analytics.py 
b/providers/common/sql/tests/unit/common/sql/operators/test_analytics.py
new file mode 100644
index 00000000000..c1ded538337
--- /dev/null
+++ b/providers/common/sql/tests/unit/common/sql/operators/test_analytics.py
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import os
+import tempfile
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.providers.common.sql.config import DataSourceConfig, StorageType
+from airflow.providers.common.sql.operators.analytics import AnalyticsOperator
+
+
+class TestAnalyticsOperator:
+    @pytest.fixture
+    def mock_engine(self):
+        return MagicMock()
+
+    @pytest.fixture
+    def operator(self, mock_engine):
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default", table_name="users_data", 
uri="s3://bucket/path", format="parquet"
+        )
+        return AnalyticsOperator(
+            task_id="test_analytics",
+            datasource_configs=[datasource_config],
+            queries=["SELECT * FROM users_data"],
+            engine=mock_engine,
+        )
+
+    def test_execute_success(self, operator, mock_engine):
+        mock_engine.execute_query.return_value = {
+            "col1": [1, 2, 3, 4, 5],
+            "col2": ["dave", "bob", "alice", "carol", "eve"],
+        }
+
+        result = operator.execute(context={})
+
+        mock_engine.register_datasource.assert_called_once()
+        mock_engine.execute_query.assert_called_once_with("SELECT * FROM 
users_data")
+        assert "col1" in result
+        assert "col2" in result
+
+    def test_execute_max_rows_exceeded(self, operator, mock_engine):
+        operator.max_rows_check = 3
+        mock_engine.execute_query.return_value = {"col1": [1, 2, 3, 4]}
+
+        result = operator.execute(context={})
+
+        assert "Skipped" in result
+        assert "4 rows exceed max_rows_check (3)" in result
+
+    def test_json_output_format(self, mock_engine):
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default", table_name="users_data", 
uri="s3://bucket/path", format="parquet"
+        )
+        operator = AnalyticsOperator(
+            task_id="test_analytics",
+            datasource_configs=[datasource_config],
+            queries=["SELECT * FROM users_data"],
+            engine=mock_engine,
+            result_output_format="json",
+        )
+
+        mock_engine.execute_query.return_value = {
+            "id": [1, 2, 3],
+            "name": ["A", "B", "C"],
+            "value": [10.1, 20.2, 30.3],
+        }
+
+        result = operator.execute(context={})
+
+        json_result = json.loads(result)
+        assert len(json_result) == 1
+        assert json_result[0]["query"] == "SELECT * FROM users_data"
+        assert len(json_result[0]["data"]) == 3
+        assert json_result[0]["data"][0] == {"id": 1, "name": "A", "value": 
10.1}
+        assert json_result[0]["data"][1] == {"id": 2, "name": "B", "value": 
20.2}
+        assert json_result[0]["data"][2] == {"id": 3, "name": "C", "value": 
30.3}
+
+    def test_tabulate_output_format(self, mock_engine):
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default", table_name="users_data", 
uri="s3://bucket/path", format="parquet"
+        )
+        operator = AnalyticsOperator(
+            task_id="test_analytics",
+            datasource_configs=[datasource_config],
+            queries=["SELECT * FROM users_data"],
+            engine=mock_engine,
+            result_output_format="tabulate",
+        )
+
+        mock_engine.execute_query.return_value = {
+            "product": ["apple", "banana", "cherry"],
+            "quantity": [10, 20, 15],
+        }
+
+        result = operator.execute(context={})
+
+        assert "product" in result
+        assert "Results: SELECT * FROM users_data" in result
+
+    def test_unsupported_output_format(self, mock_engine):
+        datasource_config = DataSourceConfig(
+            conn_id="aws_default", table_name="users_data", 
uri="s3://bucket/path", format="parquet"
+        )
+        operator = AnalyticsOperator(
+            task_id="test_analytics",
+            datasource_configs=[datasource_config],
+            queries=["SELECT * FROM users_data"],
+            engine=mock_engine,
+            result_output_format=["invalid"],  # type: ignore
+        )
+
+        with pytest.raises(ValueError, match="Unsupported output format"):
+            operator.execute(context={})
+
+    def test_execute_with_local_csv(self):
+        with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", 
delete=False) as f:
+            f.write("name,age\nAlice,30\nBob,25\n")
+            csv_path = f.name
+        try:
+            datasource_config = DataSourceConfig(
+                conn_id="",
+                table_name="test_csv",
+                uri=f"file://{csv_path}",
+                format="csv",
+                storage_type=StorageType.LOCAL,
+            )
+            operator = AnalyticsOperator(
+                task_id="test_analytics",
+                datasource_configs=[datasource_config],
+                queries=["SELECT * FROM test_csv ORDER BY name"],
+                engine=None,
+            )
+
+            result = operator.execute(context={})
+
+            assert "Alice" in result
+            assert "Bob" in result
+            assert "Results: SELECT * FROM test_csv ORDER BY name" in result
+        finally:
+            os.unlink(csv_path)
diff --git a/providers/common/sql/tests/unit/common/sql/test_config.py 
b/providers/common/sql/tests/unit/common/sql/test_config.py
new file mode 100644
index 00000000000..abe020b6ad1
--- /dev/null
+++ b/providers/common/sql/tests/unit/common/sql/test_config.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.common.sql.config import ConnectionConfig, 
DataSourceConfig, StorageType
+
+
+class TestDataSourceConfig:
+    def test_successful_creation(self):
+        config = DataSourceConfig(conn_id="test_conn", uri="s3://bucket/key", 
table_name="my_table")
+        assert config.conn_id == "test_conn"
+        assert config.uri == "s3://bucket/key"
+        assert config.table_name == "my_table"
+        assert config.storage_type == StorageType.S3
+
+    @pytest.mark.parametrize(
+        ("uri", "expected_type"),
+        [
+            ("s3://bucket/path", StorageType.S3),
+            ("file:///path/to/file", StorageType.LOCAL),
+        ],
+    )
+    def test_extract_storage_type(self, uri, expected_type):
+        config = DataSourceConfig(conn_id="test", uri=uri, 
table_name="a_table" if expected_type else None)
+        assert config.storage_type == expected_type
+
+    def test_invalid_storage_type_raises_error(self):
+        with pytest.raises(ValueError, match="Unsupported storage type for 
URI"):
+            DataSourceConfig(conn_id="test", uri="unknown://bucket/path", 
table_name="a_table")
+
+    def test_missing_table_name_raises_error(self):
+        with pytest.raises(ValueError, match="Table name must be provided for 
storage type"):
+            DataSourceConfig(conn_id="test", uri="s3://bucket/path")
+
+    def test_parquet_with_partition_cols(self):
+        config = DataSourceConfig(
+            conn_id="test_conn",
+            uri="s3://bucket/path",
+            table_name="my_table",
+            format="parquet",
+            options={"table_partition_cols": [("year", "integer"), ("month", 
"integer")]},
+        )
+        assert config.conn_id == "test_conn"
+        assert config.uri == "s3://bucket/path"
+        assert config.table_name == "my_table"
+        assert config.format == "parquet"
+        assert config.options == {"table_partition_cols": [("year", 
"integer"), ("month", "integer")]}
+        assert config.storage_type == StorageType.S3
+
+
+class TestConnectionConfig:
+    def test_connection_config_creation(self):
+        config = ConnectionConfig(
+            conn_id="my_conn", credentials={"key": "value"}, 
extra_config={"timeout": 30}
+        )
+        assert config.conn_id == "my_conn"
+        assert config.credentials == {"key": "value"}
+        assert config.extra_config == {"timeout": 30}
+
+    def test_connection_config_defaults(self):
+        config = ConnectionConfig(conn_id="my_conn")
+        assert config.credentials == {}
+        assert config.extra_config == {}

Reply via email to