This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e51e6867ad Add support for influx3 (#58929)
8e51e6867ad is described below

commit 8e51e6867adc799f4aab72a5558c42cbf9dfd113
Author: Arpit Rathore <[email protected]>
AuthorDate: Thu May 14 16:48:33 2026 +0530

    Add support for influx3 (#58929)
    
    * Add apache-airflow-providers-influxdb3 provider
    
    This commit adds a new provider package for InfluxDB 3.x support,
    separate from the existing influxdb provider which supports InfluxDB 2.x.
    
    InfluxDB 3.x uses SQL queries and a different API compared to InfluxDB 2.x
    which uses Flux queries. This separation allows for:
    - Clean API design specific to InfluxDB 3.x
    - Independent evolution of both providers
    - No breaking changes to existing InfluxDB 2.x users
    
    The provider includes:
    - InfluxDB3Hook for connecting to InfluxDB 3.x databases
    - InfluxDB3Operator for executing SQL queries
    - Comprehensive unit and system tests
    - Complete documentation
    
    Closes #58610
    
    * Fix test mocks to use pandas DataFrame instead of empty list
    
    - Update test_query to properly mock and verify DataFrame return type
    - Update test_execute in operator tests to use DataFrame
    - Add test for ImportError when influxdb3-python is not available
    - Ensure all tests properly validate return types
    
    * Update InfluxDB3 provider: simplify implementation and set min Airflow to 
2.11.0
    
    - Simplify InfluxDB3Hook.query() to use client's native pandas DataFrame 
return
    - Simplify InfluxDB3Operator to handle DataFrame to JSON conversion
    - Update tests to match simplified implementation
    - Update minimum Airflow version requirement to 2.11.0 per provider policy
    - Update documentation to reflect Airflow 2.11.0+ requirement
    
    * Improve InfluxDB3Hook connection parameter handling and error messages
    
    - Add docstring explaining how connection parameters are read from form 
widgets
    - Improve error messages to reference connection form fields explicitly
    - Add explicit handling for optional org parameter
    - Clarify that form widgets automatically populate extras JSON field
    
    * Fix PR review issues: add return type annotation and fix test error 
message
    
    - Add return type annotation to InfluxDB3Operator.execute() method
    - Add docstring to execute() method explaining return value
    - Fix test error message to match actual error message from hook
    - Improve code formatting consistency
    
    * Fix PR review issues: security fix and type annotations
    
    - Fix CodeQL security issue: use endswith() instead of 'in' for hostname 
check
    - Add proper type annotations for MyPy compliance:
      - self.connection: Connection | None
      - self.extras: dict[str, Any]
      - self.uri: str | None
      - get_client kwargs: dict[str, Any]
    - Add None safety check for conn.host in get_uri()
    - Add Point availability check in write() method
    
    * Fix MyPy type annotation issue: remove Connection type from instance 
variable
    
    Remove Connection type annotation from self.connection to match pattern
    used in other providers. Connection is only imported under TYPE_CHECKING
    and MyPy has issues with instance variable type annotations in this case.
    Function parameter type annotations work fine with TYPE_CHECKING imports.
    
    * Add InfluxDB 3.x support to existing influxdb provider
    
    Consolidates InfluxDB 3.x (Core/Enterprise/Cloud Dedicated) hooks,
    operators, tests, and docs into apache-airflow-providers-influxdb,
    removing the draft separate influxdb3 provider in favour of a single
    unified package per reviewer feedback (eladkal).
    
    - Add InfluxDB3Hook: SQL queries via influxdb3-python, returns DataFrame
    - Add InfluxDB3Operator: executes SQL, returns JSON-serialisable results
    - Add influxdb3 connection type with form widgets (token, database, org)
    - Add unit tests for hook and operator
    - Add system/example DAG
    - Add connection-type documentation
    - Update provider.yaml, pyproject.toml, get_provider_info.py
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * Fix MyPy Context import and test failures in InfluxDB3 provider
    
    - Replace try/except Context import with direct sdk import (fixes MyPy 
attr-defined/no-redef errors)
    - Remove assignments to read-only extra_dejson property in tests
    - Fix test_query and test_write to return client mock from get_conn
    
    * Complete influxdb3 connection type config in provider.yaml
    
    Add hook-name, ui-field-behaviour, and conn-fields (token, database, org)
    to the influxdb3 connection type entry, matching the structure of the
    existing influxdb connection type.
    
    * Fix pandas as optional import and skip pandas tests when unavailable
    
    Move 'import pandas' inside query() so the hook imports cleanly without
    pandas installed, fixing lowest-deps CI failure. Guard pandas under
    TYPE_CHECKING for the return type annotation. Use pytest.importorskip
    in tests that require pandas so they skip gracefully in environments
    without it.
    
    * Fix spelling: celsius -> Celsius in docstring example
    
    * Fix CI checks for InfluxDB 3 provider
    
    Sync regenerated provider build files (get_provider_info.py, docs/index.rst,
    uv.lock) so pip check, the update-providers-build-files prek hook, and the
    quick image build pass. Also fix the InfluxDB3Hook docstrings so Sphinx no
    longer warns about the bullet list and example block, which was failing both
    the docs build and the spellcheck job.
    
    ---------
    
    Co-authored-by: Arpit Rathore <[email protected]>
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 providers/influxdb/docs/connections/influxdb3.rst  |  51 +++++
 providers/influxdb/docs/index.rst                  |   4 +-
 providers/influxdb/docs/operators/index.rst        |  15 ++
 providers/influxdb/provider.yaml                   |  41 ++++
 providers/influxdb/pyproject.toml                  |   1 +
 .../providers/influxdb/get_provider_info.py        |  42 +++-
 .../airflow/providers/influxdb/hooks/influxdb3.py  | 247 +++++++++++++++++++++
 .../providers/influxdb/operators/influxdb3.py      |  72 ++++++
 .../tests/system/influxdb/example_influxdb3.py     |  82 +++++++
 .../tests/unit/influxdb/hooks/test_influxdb3.py    | 123 ++++++++++
 .../unit/influxdb/operators/test_influxdb3.py      |  59 +++++
 uv.lock                                            |  18 ++
 12 files changed, 750 insertions(+), 5 deletions(-)

diff --git a/providers/influxdb/docs/connections/influxdb3.rst 
b/providers/influxdb/docs/connections/influxdb3.rst
new file mode 100644
index 00000000000..b5a0beae494
--- /dev/null
+++ b/providers/influxdb/docs/connections/influxdb3.rst
@@ -0,0 +1,51 @@
+
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+.. _howto/connection:influxdb3:
+
+InfluxDB 3 Connection
+=====================
+The InfluxDB 3 connection type provides connection to an InfluxDB 3.x database
+(Core/Enterprise/Cloud Dedicated).
+
+InfluxDB 3.x uses SQL queries and a different API compared to InfluxDB 2.x.
+For InfluxDB 2.x support, use the ``influxdb`` connection type instead.
+
+Configuring the Connection
+--------------------------
+Host (required)
+    The host to connect to (e.g., 
``https://us-east-1-1.aws.cloud2.influxdata.com``).
+
+Extra (required)
+    Specify the extra parameters (as json dictionary) that can be used in 
InfluxDB 3
+    connection.
+
+    Example "extras" field:
+
+    .. code-block:: JSON
+
+      {
+        "token": "your-api-token-here",
+        "database": "my_database",
+        "org": "my_org"
+      }
+
+    Parameters:
+        * ``token`` (required): API token for authentication
+        * ``database`` (required): Database name
+        * ``org`` (optional): Organization name
diff --git a/providers/influxdb/docs/index.rst 
b/providers/influxdb/docs/index.rst
index c6aff0166c1..0f991db9854 100644
--- a/providers/influxdb/docs/index.rst
+++ b/providers/influxdb/docs/index.rst
@@ -34,7 +34,8 @@
     :maxdepth: 1
     :caption: Guides
 
-    Connection types <connections/influxdb>
+    Connection types (InfluxDB 2.x) <connections/influxdb>
+    Connection types (InfluxDB 3.x) <connections/influxdb3>
     Operators <operators/index>
 
 .. toctree::
@@ -103,6 +104,7 @@ PIP package                                 Version required
 ``apache-airflow``                          ``>=2.11.0``
 ``apache-airflow-providers-common-compat``  ``>=1.8.0``
 ``influxdb-client``                         ``>=1.19.0``
+``influxdb3-python``                        ``>=0.7.0``
 ``requests``                                ``>=2.32.0,<3``
 ==========================================  ==================
 
diff --git a/providers/influxdb/docs/operators/index.rst 
b/providers/influxdb/docs/operators/index.rst
index f0c6255e52b..8f0eeff1271 100644
--- a/providers/influxdb/docs/operators/index.rst
+++ b/providers/influxdb/docs/operators/index.rst
@@ -31,3 +31,18 @@ An example of running the query using the operator:
     :language: python
     :start-after: [START howto_operator_influxdb]
     :end-before: [END howto_operator_influxdb]
+
+.. _howto/operator:InfluxDB3Operator:
+
+InfluxDB3Operator
+-----------------
+
+The :class:`~airflow.providers.influxdb.operators.influxdb3.InfluxDB3Operator`
+executes SQL queries in an InfluxDB 3.x database.
+
+Example usage:
+
+.. exampleinclude:: /../../influxdb/tests/system/influxdb/example_influxdb3.py
+    :language: python
+    :start-after: [START howto_operator_influxdb3]
+    :end-before: [END howto_operator_influxdb3]
diff --git a/providers/influxdb/provider.yaml b/providers/influxdb/provider.yaml
index a3c1c3b6bcb..d88d99573b6 100644
--- a/providers/influxdb/provider.yaml
+++ b/providers/influxdb/provider.yaml
@@ -69,16 +69,25 @@ integrations:
     external-doc-url: https://www.influxdata.com/
     logo: /docs/integration-logos/Influxdb.svg
     tags: [software]
+  - integration-name: InfluxDB 3
+    external-doc-url: https://www.influxdata.com/
+    tags: [software]
 
 hooks:
   - integration-name: Influxdb
     python-modules:
       - airflow.providers.influxdb.hooks.influxdb
+  - integration-name: InfluxDB 3
+    python-modules:
+      - airflow.providers.influxdb.hooks.influxdb3
 
 operators:
   - integration-name: Influxdb
     python-modules:
       - airflow.providers.influxdb.operators.influxdb
+  - integration-name: InfluxDB 3
+    python-modules:
+      - airflow.providers.influxdb.operators.influxdb3
 
 connection-types:
   - hook-class-name: airflow.providers.influxdb.hooks.influxdb.InfluxDBHook
@@ -106,3 +115,35 @@ connection-types:
             - string
             - 'null'
           default: ''
+  - hook-class-name: airflow.providers.influxdb.hooks.influxdb3.InfluxDB3Hook
+    hook-name: "InfluxDB 3"
+    connection-type: influxdb3
+    ui-field-behaviour:
+      hidden-fields:
+        - login
+        - password
+      relabeling: {}
+      placeholders: {}
+    conn-fields:
+      token:
+        label: Token
+        schema:
+          type:
+            - string
+            - 'null'
+          format: password
+          default: ''
+      database:
+        label: Database
+        schema:
+          type:
+            - string
+            - 'null'
+          default: ''
+      org:
+        label: Organization Name (optional)
+        schema:
+          type:
+            - string
+            - 'null'
+          default: ''
diff --git a/providers/influxdb/pyproject.toml 
b/providers/influxdb/pyproject.toml
index fcf40af4dc5..7ba3fe7de82 100644
--- a/providers/influxdb/pyproject.toml
+++ b/providers/influxdb/pyproject.toml
@@ -62,6 +62,7 @@ dependencies = [
     "apache-airflow>=2.11.0",
     "apache-airflow-providers-common-compat>=1.8.0",
     "influxdb-client>=1.19.0",
+    "influxdb3-python>=0.7.0",
     "requests>=2.32.0,<3",
 ]
 
diff --git 
a/providers/influxdb/src/airflow/providers/influxdb/get_provider_info.py 
b/providers/influxdb/src/airflow/providers/influxdb/get_provider_info.py
index f9f0a30e5a8..f8334178029 100644
--- a/providers/influxdb/src/airflow/providers/influxdb/get_provider_info.py
+++ b/providers/influxdb/src/airflow/providers/influxdb/get_provider_info.py
@@ -32,16 +32,29 @@ def get_provider_info():
                 "external-doc-url": "https://www.influxdata.com/";,
                 "logo": "/docs/integration-logos/Influxdb.svg",
                 "tags": ["software"],
-            }
+            },
+            {
+                "integration-name": "InfluxDB 3",
+                "external-doc-url": "https://www.influxdata.com/";,
+                "tags": ["software"],
+            },
         ],
         "hooks": [
-            {"integration-name": "Influxdb", "python-modules": 
["airflow.providers.influxdb.hooks.influxdb"]}
+            {"integration-name": "Influxdb", "python-modules": 
["airflow.providers.influxdb.hooks.influxdb"]},
+            {
+                "integration-name": "InfluxDB 3",
+                "python-modules": 
["airflow.providers.influxdb.hooks.influxdb3"],
+            },
         ],
         "operators": [
             {
                 "integration-name": "Influxdb",
                 "python-modules": 
["airflow.providers.influxdb.operators.influxdb"],
-            }
+            },
+            {
+                "integration-name": "InfluxDB 3",
+                "python-modules": 
["airflow.providers.influxdb.operators.influxdb3"],
+            },
         ],
         "connection-types": [
             {
@@ -63,6 +76,27 @@ def get_provider_info():
                         "schema": {"type": ["string", "null"], "default": ""},
                     },
                 },
-            }
+            },
+            {
+                "hook-class-name": 
"airflow.providers.influxdb.hooks.influxdb3.InfluxDB3Hook",
+                "hook-name": "InfluxDB 3",
+                "connection-type": "influxdb3",
+                "ui-field-behaviour": {
+                    "hidden-fields": ["login", "password"],
+                    "relabeling": {},
+                    "placeholders": {},
+                },
+                "conn-fields": {
+                    "token": {
+                        "label": "Token",
+                        "schema": {"type": ["string", "null"], "format": 
"password", "default": ""},
+                    },
+                    "database": {"label": "Database", "schema": {"type": 
["string", "null"], "default": ""}},
+                    "org": {
+                        "label": "Organization Name (optional)",
+                        "schema": {"type": ["string", "null"], "default": ""},
+                    },
+                },
+            },
         ],
     }
diff --git 
a/providers/influxdb/src/airflow/providers/influxdb/hooks/influxdb3.py 
b/providers/influxdb/src/airflow/providers/influxdb/hooks/influxdb3.py
new file mode 100644
index 00000000000..0659e056813
--- /dev/null
+++ b/providers/influxdb/src/airflow/providers/influxdb/hooks/influxdb3.py
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module allows to connect to an InfluxDB 3 database.
+
+InfluxDB 3.x (Core/Enterprise/Cloud Dedicated) uses SQL queries and a different
+API compared to InfluxDB 2.x.
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+try:
+    from influxdb3 import InfluxDBClient3, Point
+
+    INFLUXDB_CLIENT_3_AVAILABLE = True
+except ImportError:
+    try:
+        # Alternative import path
+        from influxdb_client_3 import InfluxDBClient3, Point
+
+        INFLUXDB_CLIENT_3_AVAILABLE = True
+    except ImportError:
+        INFLUXDB_CLIENT_3_AVAILABLE = False
+        InfluxDBClient3 = None  # type: ignore[assignment, misc]
+        Point = None  # type: ignore[assignment, misc]
+
+from airflow.providers.common.compat.sdk import BaseHook
+
+if TYPE_CHECKING:
+    import pandas as pd
+
+    from airflow.models import Connection
+
+
+class InfluxDB3Hook(BaseHook):
+    """
+    Interact with InfluxDB 3.x (Core/Enterprise/Cloud Dedicated).
+
+    Performs a connection to InfluxDB 3.x and retrieves client.
+
+    :param influxdb3_conn_id: Reference to :ref:`InfluxDB 3 connection id 
<howto/connection:influxdb3>`.
+    """
+
+    conn_name_attr = "influxdb3_conn_id"
+    default_conn_name = "influxdb3_default"
+    conn_type = "influxdb3"
+    hook_name = "InfluxDB 3"
+
+    def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> 
None:
+        super().__init__(*args, **kwargs)
+        self.influxdb3_conn_id = conn_id
+        self.connection = kwargs.pop("connection", None)
+        self.client: Any = None
+        self.extras: dict[str, Any] = {}
+        self.uri: str | None = None
+
+    @classmethod
+    def get_connection_form_widgets(cls) -> dict[str, Any]:
+        """Return connection widgets to add to connection form."""
+        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import StringField
+
+        return {
+            "token": StringField(lazy_gettext("Token"), 
widget=BS3TextFieldWidget(), default=""),
+            "database": StringField(
+                lazy_gettext("Database"),
+                widget=BS3TextFieldWidget(),
+                default="",
+            ),
+            "org": StringField(
+                lazy_gettext("Organization Name (optional)"),
+                widget=BS3TextFieldWidget(),
+                default="",
+            ),
+        }
+
+    def get_client(self, uri: str, kwargs: dict[str, Any]) -> InfluxDBClient3:
+        """Get InfluxDB 3.x client."""
+        if not INFLUXDB_CLIENT_3_AVAILABLE:
+            raise ImportError(
+                "influxdb3-python is required for InfluxDB 3.x support. "
+                "Install it with: pip install influxdb3-python"
+            )
+
+        database = kwargs.pop("database", None) or kwargs.pop("db", None)
+        if not database:
+            raise ValueError("database parameter is required for InfluxDB 3.x")
+
+        return InfluxDBClient3(
+            host=uri,
+            token=kwargs.get("token"),
+            database=database,
+            org=kwargs.get("org", ""),
+        )
+
+    def get_uri(self, conn: Connection) -> str:
+        """Build URI from connection parameters."""
+        conn_scheme = "https" if conn.schema is None else conn.schema
+
+        # Use appropriate default port based on scheme
+        if conn.port is None:
+            conn_port = 443 if conn_scheme == "https" else 8086
+        else:
+            conn_port = conn.port
+
+        # For InfluxDB Cloud Dedicated, if host ends with .influxdb.io and 
using HTTPS,
+        # default to port 443 if port is 8086 (common misconfiguration)
+        if (
+            conn_scheme == "https"
+            and conn.host
+            and conn.host.lower().endswith(".influxdb.io")
+            and conn_port == 8086
+        ):
+            self.log.warning(
+                "InfluxDB Cloud Dedicated detected with HTTPS but port 8086. "
+                "Switching to port 443. If this is incorrect, explicitly set 
the port in connection."
+            )
+            conn_port = 443
+
+        host = conn.host or ""
+        return f"{conn_scheme}://{host}:{conn_port}"
+
+    def get_conn(self) -> InfluxDBClient3:
+        """
+        Initiate a new InfluxDB 3.x connection with token and database.
+
+        Reads connection parameters from:
+
+        - Custom form fields (token, database, org) - automatically stored in 
extras
+        - Connection password field (as fallback for token)
+        - Connection extras JSON (for manual configuration)
+        """
+        self.connection = self.get_connection(self.influxdb3_conn_id)
+        self.extras = self.connection.extra_dejson.copy()
+
+        self.uri = self.get_uri(self.connection)
+        self.log.info("URI: %s", self.uri)
+
+        if self.client is not None:
+            return self.client
+
+        # Token: prefer extras (from form widget), fallback to password field
+        if "token" not in self.extras or not self.extras.get("token"):
+            token = getattr(self.connection, "password", None)
+            if token:
+                self.extras["token"] = token
+            elif not self.extras.get("token"):
+                raise ValueError(
+                    "token is required for InfluxDB 3.x. "
+                    "Set it in the 'Token' field of the connection form or in 
connection extras."
+                )
+
+        # Database: required for InfluxDB 3.x (from form widget or extras)
+        database = self.extras.get("database") or self.extras.get("db")
+        if not database:
+            raise ValueError(
+                "database is required for InfluxDB 3.x. "
+                "Set it in the 'Database' field of the connection form or in 
connection extras."
+            )
+        self.extras["database"] = database
+
+        # Org: optional (from form widget or extras)
+        if "org" not in self.extras:
+            self.extras["org"] = ""
+
+        self.client = self.get_client(self.uri, self.extras)
+
+        return self.client
+
+    def query(self, query: str) -> pd.DataFrame:
+        """
+        Run a SQL query and return results as a pandas DataFrame.
+
+        :param query: SQL query string
+        :return: pandas DataFrame with query results
+        """
+        import pandas as pd
+
+        client = self.get_conn()
+        result = client.query(query=query, language="sql", mode="pandas")
+
+        if not isinstance(result, pd.DataFrame):
+            raise ValueError(
+                f"Query did not return a DataFrame. "
+                f"Result type: 
{type(result).__module__}.{type(result).__name__}"
+            )
+
+        return result
+
+    def write(
+        self,
+        measurement: str,
+        tags: dict[str, str] | None = None,
+        fields: dict[str, Any] | None = None,
+    ) -> None:
+        """
+        Write a Point to the database.
+
+        :param measurement: Measurement name
+        :param tags: Dictionary of tags (key-value pairs)
+        :param fields: Dictionary of fields (key-value pairs)
+
+        .. code-block:: python
+
+            hook.write(
+                measurement="temperature",
+                tags={"location": "Prague", "sensor": "A1"},
+                fields={"value": 25.3, "unit": "Celsius"},
+            )
+        """
+        if not INFLUXDB_CLIENT_3_AVAILABLE or Point is None:
+            raise ImportError(
+                "influxdb3-python is required for InfluxDB 3.x support. "
+                "Install it with: pip install influxdb3-python"
+            )
+        if not fields:
+            raise ValueError("At least one field is required")
+
+        client = self.get_conn()
+
+        # Create Point
+        point = Point(measurement)
+        if tags:
+            for tag_key, tag_val in tags.items():
+                point = point.tag(tag_key, tag_val)
+        for field_key, field_val in fields.items():
+            point = point.field(field_key, field_val)
+
+        client.write(record=point)
diff --git 
a/providers/influxdb/src/airflow/providers/influxdb/operators/influxdb3.py 
b/providers/influxdb/src/airflow/providers/influxdb/operators/influxdb3.py
new file mode 100644
index 00000000000..f863a057906
--- /dev/null
+++ b/providers/influxdb/src/airflow/providers/influxdb/operators/influxdb3.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Operator for executing SQL queries in InfluxDB 3.x."""
+
+from __future__ import annotations
+
+import json
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Any
+
+from airflow.providers.common.compat.sdk import BaseOperator
+from airflow.providers.influxdb.hooks.influxdb3 import InfluxDB3Hook
+
+if TYPE_CHECKING:
+    from airflow.sdk.definitions.context import Context
+
+
+class InfluxDB3Operator(BaseOperator):
+    """
+    Execute SQL query in InfluxDB 3.x database.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:InfluxDB3Operator`
+
+    :param sql: The SQL query to be executed
+    :param influxdb3_conn_id: Reference to :ref:`InfluxDB 3 connection id 
<howto/connection:influxdb3>`.
+    """
+
+    template_fields: Sequence[str] = ("sql",)
+
+    def __init__(
+        self,
+        *,
+        sql: str,
+        influxdb3_conn_id: str = "influxdb3_default",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.influxdb3_conn_id = influxdb3_conn_id
+        self.sql = sql
+
+    def execute(self, context: Context) -> list[dict[str, Any]]:
+        """
+        Execute SQL query and return results as JSON-serializable list of 
dictionaries.
+
+        :param context: Airflow context
+        :return: List of dictionaries representing query results
+        """
+        self.log.info("Executing SQL query: %s", self.sql)
+        hook = InfluxDB3Hook(conn_id=self.influxdb3_conn_id)
+        result = hook.query(self.sql)
+
+        self.log.info("Query executed successfully. Rows returned: %d", 
len(result))
+
+        json_str = result.to_json(orient="records", date_format="iso")
+        return json.loads(json_str)
diff --git a/providers/influxdb/tests/system/influxdb/example_influxdb3.py 
b/providers/influxdb/tests/system/influxdb/example_influxdb3.py
new file mode 100644
index 00000000000..10dc16a5023
--- /dev/null
+++ b/providers/influxdb/tests/system/influxdb/example_influxdb3.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG demonstrating InfluxDB 3.x integration.
+
+This example shows how to:
+1. Write data points to InfluxDB 3.x
+2. Query data using SQL
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+try:
+    from airflow.sdk import task
+except ImportError:
+    # Airflow 2 path
+    from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
+from airflow.models.dag import DAG
+from airflow.providers.influxdb.hooks.influxdb3 import InfluxDB3Hook
+from airflow.providers.influxdb.operators.influxdb3 import InfluxDB3Operator
+
+
+@task(task_id="write_data")
+def write_to_influxdb3():
+    """Write sample data to InfluxDB 3.x."""
+    hook = InfluxDB3Hook()
+    hook.write(
+        measurement="temperature",
+        tags={"location": "Prague", "sensor": "A1"},
+        fields={"value": 25.3, "unit": "Celsius"},
+    )
+    print("Data written successfully")
+
+
+# [START howto_operator_influxdb3]
+query_task = InfluxDB3Operator(
+    task_id="query_data",
+    sql="SELECT * FROM \"temperature\" WHERE time > now() - INTERVAL '1 hour'",
+    influxdb3_conn_id="influxdb3_default",
+)
+# [END howto_operator_influxdb3]
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "influxdb3_example_dag"
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule=None,
+    start_date=datetime(2021, 1, 1),
+    max_active_runs=1,
+    tags=["example", "influxdb3"],
+) as dag:
+    write_task = write_to_influxdb3()
+    write_task >> query_task
+
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/providers/influxdb/tests/unit/influxdb/hooks/test_influxdb3.py 
b/providers/influxdb/tests/unit/influxdb/hooks/test_influxdb3.py
new file mode 100644
index 00000000000..afc7a3e1164
--- /dev/null
+++ b/providers/influxdb/tests/unit/influxdb/hooks/test_influxdb3.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.models import Connection
+from airflow.providers.influxdb.hooks.influxdb3 import InfluxDB3Hook
+
+
+class TestInfluxDB3Hook:
+    def setup_method(self):
+        self.influxdb3_hook = InfluxDB3Hook()
+        extra = {}
+        extra["token"] = "123456789"
+        extra["database"] = "test_db"
+        extra["org"] = "test_org"
+
+        self.connection = Connection(schema="https", host="localhost", 
port=8086, extra=extra)
+
+    @mock.patch("airflow.providers.influxdb.hooks.influxdb3.InfluxDBClient3")
+    def test_get_conn(self, influx_db_client_3):
+        """Test connection to InfluxDB 3.x."""
+        self.influxdb3_hook.get_connection = mock.Mock()
+        self.influxdb3_hook.get_connection.return_value = self.connection
+
+        self.influxdb3_hook.get_conn()
+
+        assert self.influxdb3_hook.uri == "https://localhost:8086";
+
+        assert self.influxdb3_hook.get_connection.return_value.schema == 
"https"
+        assert self.influxdb3_hook.get_connection.return_value.host == 
"localhost"
+        influx_db_client_3.assert_called_once_with(
+            host="https://localhost:8086";, token="123456789", 
database="test_db", org="test_org"
+        )
+
+        assert self.influxdb3_hook.get_client is not None
+
+    def test_get_conn_missing_database(self):
+        """Test that InfluxDB 3.x requires database parameter."""
+        extra = {}
+        extra["token"] = "123456789"
+
+        connection = Connection(schema="https", host="localhost", extra=extra)
+        influxdb3_hook = InfluxDB3Hook()
+
+        influxdb3_hook.get_connection = mock.Mock()
+        influxdb3_hook.get_connection.return_value = connection
+
+        with pytest.raises(ValueError, match="database is required"):
+            influxdb3_hook.get_conn()
+
+    def test_query(self):
+        """Test query with InfluxDB 3.x."""
+        pd = pytest.importorskip("pandas")
+
+        self.influxdb3_hook.client = mock.Mock()
+        mock_df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
+        self.influxdb3_hook.client.query = mock.Mock(return_value=mock_df)
+        self.influxdb3_hook.get_conn = 
mock.Mock(return_value=self.influxdb3_hook.client)
+
+        influxdb_query = 'SELECT "duration" FROM "pyexample"'
+        result = self.influxdb3_hook.query(influxdb_query)
+
+        self.influxdb3_hook.get_conn.assert_called()
+        self.influxdb3_hook.client.query.assert_called_once_with(
+            query=influxdb_query, language="sql", mode="pandas"
+        )
+        assert isinstance(result, pd.DataFrame)
+        assert len(result) == 2
+
+    def test_write(self):
+        """Test write with InfluxDB 3.x."""
+        self.influxdb3_hook.client = mock.Mock()
+        self.influxdb3_hook.client.write = mock.Mock()
+        self.influxdb3_hook.get_conn = 
mock.Mock(return_value=self.influxdb3_hook.client)
+
+        self.influxdb3_hook.write(
+            measurement="test_measurement",
+            tags={"location": "Prague"},
+            fields={"temperature": 25.3},
+        )
+
+        self.influxdb3_hook.client.write.assert_called_once()
+        # Verify the point was created correctly
+        call_args = self.influxdb3_hook.client.write.call_args
+        assert call_args is not None
+        assert "record" in call_args.kwargs
+
+    def test_write_no_fields(self):
+        """Test that write requires at least one field."""
+        self.influxdb3_hook.get_connection = mock.Mock()
+        self.influxdb3_hook.get_connection.return_value = self.connection
+        self.influxdb3_hook.get_conn = mock.Mock()
+        self.influxdb3_hook.client = mock.Mock()
+
+        with pytest.raises(ValueError, match="At least one field is required"):
+            self.influxdb3_hook.write(measurement="test", tags={"tag": 
"value"}, fields=None)
+
+    
@mock.patch("airflow.providers.influxdb.hooks.influxdb3.INFLUXDB_CLIENT_3_AVAILABLE",
 False)
+    def test_get_client_missing_library(self):
+        """Test that ImportError is raised when influxdb3-python is not 
installed."""
+        self.influxdb3_hook.get_connection = mock.Mock()
+        self.influxdb3_hook.get_connection.return_value = self.connection
+
+        with pytest.raises(ImportError, match="influxdb3-python is required"):
+            self.influxdb3_hook.get_conn()
diff --git a/providers/influxdb/tests/unit/influxdb/operators/test_influxdb3.py 
b/providers/influxdb/tests/unit/influxdb/operators/test_influxdb3.py
new file mode 100644
index 00000000000..d67028ad03a
--- /dev/null
+++ b/providers/influxdb/tests/unit/influxdb/operators/test_influxdb3.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.providers.influxdb.operators.influxdb3 import InfluxDB3Operator
+
+
+class TestInfluxDB3Operator:
+    def setup_method(self):
+        self.operator = InfluxDB3Operator(
+            task_id="test_task",
+            sql='SELECT "duration" FROM "pyexample"',
+            influxdb3_conn_id="influxdb3_default",
+        )
+
+    def test_init(self):
+        """Test operator initialization."""
+        assert self.operator.sql == 'SELECT "duration" FROM "pyexample"'
+        assert self.operator.influxdb3_conn_id == "influxdb3_default"
+        assert "sql" in self.operator.template_fields
+
+    @mock.patch("airflow.providers.influxdb.operators.influxdb3.InfluxDB3Hook")
+    def test_execute(self, mock_hook_class):
+        """Test operator execution."""
+
+        pd = pytest.importorskip("pandas")
+
+        mock_hook = mock.Mock()
+        mock_df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
+        mock_hook.query.return_value = mock_df
+        mock_hook_class.return_value = mock_hook
+
+        result = self.operator.execute(context={})
+
+        mock_hook_class.assert_called_once_with(conn_id="influxdb3_default")
+        mock_hook.query.assert_called_once_with('SELECT "duration" FROM 
"pyexample"')
+        assert isinstance(result, list)
+        assert len(result) == 2
+        assert isinstance(result[0], dict)
+        assert "col1" in result[0]
+        assert "col2" in result[0]
diff --git a/uv.lock b/uv.lock
index 4066b76d5c0..8a83e5f1755 100644
--- a/uv.lock
+++ b/uv.lock
@@ -5624,6 +5624,7 @@ dependencies = [
     { name = "apache-airflow" },
     { name = "apache-airflow-providers-common-compat" },
     { name = "influxdb-client" },
+    { name = "influxdb3-python" },
     { name = "requests" },
 ]
 
@@ -5643,6 +5644,7 @@ requires-dist = [
     { name = "apache-airflow", editable = "." },
     { name = "apache-airflow-providers-common-compat", editable = 
"providers/common/compat" },
     { name = "influxdb-client", specifier = ">=1.19.0" },
+    { name = "influxdb3-python", specifier = ">=0.7.0" },
     { name = "requests", specifier = ">=2.32.0,<3" },
 ]
 
@@ -13801,6 +13803,22 @@ wheels = [
     { url = 
"https://files.pythonhosted.org/packages/7f/ec/6b120b4a86f6fadc7ddb1d7c7cdcb15bcfb332deb022ab60df51bcd4494c/influxdb_client-1.50.0-py3-none-any.whl";,
 hash = 
"sha256:f172975cf7f0c95bfe74f288b31273393b164d2c58a948de55497d9956ab49be", size 
= 746289, upload-time = "2026-01-23T09:39:37.377Z" },
 ]
 
+[[package]]
+name = "influxdb3-python"
+version = "0.19.0"
+source = { registry = "https://pypi.org/simple"; }
+dependencies = [
+    { name = "certifi" },
+    { name = "pyarrow" },
+    { name = "python-dateutil" },
+    { name = "reactivex" },
+    { name = "urllib3" },
+]
+sdist = { url = 
"https://files.pythonhosted.org/packages/e7/fb/93d9df24f2ae288dd43892c4aaf6f31505b377754b8eb974f7803fdd1e51/influxdb3_python-0.19.0.tar.gz";,
 hash = 
"sha256:4654f15cb56116321446daf3bbde6aefe075888014660218c2ef423da9679c3b", size 
= 101159, upload-time = "2026-04-23T04:25:46.705Z" }
+wheels = [
+    { url = 
"https://files.pythonhosted.org/packages/f9/86/ac8a67a8754b2f0f9743fb4d7b510bc9740e71301545377c1106b168aaac/influxdb3_python-0.19.0-py3-none-any.whl";,
 hash = 
"sha256:0ae81565f3ba1428ede4b88c476f006e0b055fe0bf18ccbfeec2dea18440f614", size 
= 95981, upload-time = "2026-04-23T04:25:45.242Z" },
+]
+
 [[package]]
 name = "iniconfig"
 version = "2.3.0"


Reply via email to