joe-clickhouse commented on code in PR #67080:
URL: https://github.com/apache/airflow/pull/67080#discussion_r3262249915


##########
providers/clickhouse/src/airflow/providers/clickhouse/hooks/clickhouse.py:
##########
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+from typing import TYPE_CHECKING, Any
+from urllib.parse import quote_plus
+
+from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+if TYPE_CHECKING:
+    import clickhouse_connect.driver.client
+    from clickhouse_connect.dbapi.cursor import Cursor as ClickHouseDbApiCursor
+
+# Optional scalar parameters forwarded verbatim to 
clickhouse_connect.get_client()
+# Note: client_name is handled separately by _build_client_name() so Airflow 
version
+# info is always embedded in the HTTP User-Agent / system.query_log entry.
+_OPTIONAL_CLIENT_KWARGS = (
+    "connect_timeout",
+    "send_receive_timeout",
+    "compress",
+)
+
+# kwargs that the hook always manages itself.  User-supplied client_kwargs that
+# overlap with these keys are dropped and logged at DEBUG so the hook's own 
values always win.
+_HOOK_MANAGED_KWARGS: frozenset[str] = frozenset(
+    {
+        "host",
+        "port",
+        "username",
+        "password",
+        "database",
+        "secure",
+        "verify",
+        "client_name",
+        "settings",
+    }
+)
+
+
+def _build_client_name(custom: str | None = None) -> str:
+    """
+    Return the ``client_name`` string passed to 
``clickhouse_connect.get_client()``.
+
+    The value is prepended to the HTTP ``User-Agent`` header that ClickHouse 
records
+    in ``system.query_log``, so every query is traceable back to its Airflow 
source.
+
+    Format (no custom label)::
+
+        apache-airflow/<airflow_version> 
apache-airflow-providers-clickhouse/<provider_version>
+
+    Format (with the ``client_name`` extra field set to ``"my-pipeline"``)::
+
+        apache-airflow/<airflow_version> 
apache-airflow-providers-clickhouse/<provider_version> (my-pipeline)
+
+    The ``clickhouse_connect`` library appends its own token and OS 
information, so
+    the full ``User-Agent`` looks like::
+
+        apache-airflow/X.Y.Z apache-airflow-providers-clickhouse/X.Y.Z
+        clickhouse-connect/X.Y.Z (lv:py/X.Y.Z; mode:sync; os:linux)
+    """
+    import airflow
+    from airflow.providers.clickhouse import __version__ as provider_version
+
+    name = f"apache-airflow/{airflow.__version__} 
apache-airflow-providers-clickhouse/{provider_version}"
+    if custom:
+        name = f"{name} ({custom.strip()})"
+    return name
+
+
+class ClickHouseConnection:
+    """
+    Minimal DB-API 2.0 connection adapter wrapping a ``clickhouse_connect`` 
Client.
+
+    SQL execution is delegated to ``clickhouse_connect.dbapi.cursor.Cursor``, 
which
+    routes each statement to ``client.query()`` or ``client.command()`` 
automatically
+    by inspecting the SQL keyword after stripping comments — the same logic 
used by
+    the ``clickhouse-connect`` SQLAlchemy dialect.
+
+    ClickHouse has no multi-statement transactions.  Every statement is 
effectively
+    auto-committed, so ``commit()`` and ``rollback()`` are intentional no-ops 
and
+    the ``autocommit`` attribute is always ``True``.  ``DbApiHook.run()`` 
checks
+    ``conn.autocommit`` via :meth:`ClickHouseHook.get_autocommit` and skips the
+    ``conn.commit()`` call when it is truthy.
+    """
+
+    # Signals to DbApiHook.get_autocommit() that no explicit commit is needed.
+    autocommit: bool = True
+
+    def __init__(self, client: clickhouse_connect.driver.client.Client) -> 
None:
+        self._client = client
+
+    def cursor(self) -> ClickHouseDbApiCursor:
+        from clickhouse_connect.dbapi.cursor import Cursor
+
+        return Cursor(self._client)
+
+    def close(self) -> None:
+        self._client.close()
+
+    def commit(self) -> None:
+        pass  # ClickHouse has no multi-statement transactions
+
+    def rollback(self) -> None:
+        pass  # ClickHouse has no multi-statement transactions
+
+
+class ClickHouseHook(DbApiHook):
+    """
+    Interact with ClickHouse via the HTTP interface (``clickhouse-connect``).
+
+    This hook wraps ``clickhouse_connect.get_client()`` behind a thin DB-API 
2.0
+    adapter (:class:`ClickHouseConnection`), so all standard
+    
:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
+    features work out of the box (templating, ``handler``, 
``split_statements``,
+    etc.).
+
+    :param database: Optional database name.  Overrides the ``schema`` field 
of the
+        Airflow connection.  Useful when one connection points to a ClickHouse 
cluster
+        and individual tasks need to target different databases.
+    :param session_settings: Optional dict of ClickHouse session-level settings
+        (e.g. ``{"max_execution_time": 60, "max_threads": 4}``).  Values 
supplied
+        here are **merged on top of** any ``session_settings`` dict already 
present
+        in the connection's ``extra`` JSON field, with the constructor argument
+        taking precedence. For a full list of available session settings visit 
https://clickhouse.com/docs/operations/settings/settings
+    :param client_kwargs: Optional dict of additional keyword arguments 
forwarded
+        verbatim to ``clickhouse_connect.get_client()``.  Use this to pass any
+        parameter not otherwise exposed by the hook (e.g. ``http_proxy``,
+        ``pool_mgr_params``).  Values supplied here are **merged on top of** 
any
+        ``client_kwargs`` dict already present in the connection's ``extra`` 
JSON
+        field, with the constructor argument taking precedence on conflicting 
keys.
+        Keys that the hook manages itself (``host``, ``port``, ``username``,
+        ``password``, ``database``, ``secure``, ``verify``, ``client_name``,
+        ``settings``) are silently ignored so that hook-managed values always
+        take precedence.
+
+    .. seealso::
+        - `clickhouse-connect documentation 
<https://clickhouse.com/docs/en/integrations/python>`_
+        - `clickhouse session settings documentation 
<https://clickhouse.com/docs/operations/settings/settings>`_
+        - :ref:`howto/connection:clickhouse`
+
+    """
+
+    conn_name_attr = "clickhouse_conn_id"
+    default_conn_name = "clickhouse_default"
+    conn_type = "clickhouse"
+    hook_name = "ClickHouse"
+
+    # ClickHouse has no multi-statement transactions; every statement is 
auto-committed.
+    supports_autocommit = True
+    _test_connection_sql = "SELECT 1"
+
+    def set_autocommit(self, conn: ClickHouseConnection, autocommit: bool) -> 
None:
+        """No-op: ClickHouse has no transaction support."""
+
+    def get_autocommit(self, conn: ClickHouseConnection) -> bool:
+        """Return ``True``: ClickHouse auto-commits every statement."""
+        return True
+
+    def __init__(
+        self,
+        *args,
+        database: str | None = None,
+        session_settings: dict[str, Any] | None = None,
+        client_kwargs: dict[str, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.database = database
+        self.session_settings: dict[str, Any] = session_settings or {}
+        self.client_kwargs: dict[str, Any] = client_kwargs or {}
+
+    @classmethod
+    def get_connection_form_widgets(cls) -> dict[str, Any]:
+        """Return extra connection form widgets shown in the Airflow UI."""
+        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import BooleanField, IntegerField, StringField
+
+        return {
+            "secure": BooleanField(
+                label=lazy_gettext("Use TLS (HTTPS)"),
+                description="Enable TLS/HTTPS for the connection (default: 
False).",
+            ),
+            "verify": BooleanField(
+                label=lazy_gettext("Verify SSL Certificate"),
+                description="Verify the server SSL certificate when TLS is 
enabled (default: True).",
+            ),
+            "connect_timeout": IntegerField(
+                lazy_gettext("Connection Timeout (seconds)"),
+                widget=BS3TextFieldWidget(),
+            ),
+            "send_receive_timeout": IntegerField(
+                lazy_gettext("Query Timeout (seconds)"),
+                widget=BS3TextFieldWidget(),
+            ),
+            "compress": BooleanField(
+                label=lazy_gettext("Enable LZ4 Compression"),
+                description="Compress query results with LZ4 (default: True).",
+            ),
+            "client_name": StringField(
+                lazy_gettext("Client Name (label)"),
+                widget=BS3TextFieldWidget(),
+                description=(
+                    "Optional label appended to the Airflow version identifier 
in the "
+                    "ClickHouse HTTP User-Agent and system.query_log 
client_name field. "
+                    "The hook always prepends apache-airflow/<version> "
+                    "apache-airflow-providers-clickhouse/<version>."
+                ),
+            ),
+            "session_settings": StringField(
+                lazy_gettext("Session Settings (JSON)"),
+                widget=BS3TextFieldWidget(),
+                description=(
+                    "ClickHouse session-level settings as a JSON object, "
+                    'e.g. {"max_execution_time": 60, "max_threads": 4}.'
+                ),
+            ),
+            "client_kwargs": StringField(
+                lazy_gettext("Client kwargs (JSON)"),
+                widget=BS3TextFieldWidget(),
+                description=(
+                    "Additional keyword arguments forwarded to 
clickhouse_connect.get_client() "
+                    'as a JSON object, e.g. {"http_proxy": 
"http://proxy:8080"}. '
+                    "Hook-managed keys (host, port, username, password, 
database, secure, "
+                    "verify, client_name, settings) are ignored."
+                ),
+            ),
+        }
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Return custom field behaviour for the connection form."""
+        return {
+            "hidden_fields": [],
+            "relabeling": {
+                "schema": "Database",
+            },
+            "placeholders": {
+                "host": "localhost",
+                "port": "8123",
+                "login": "default",
+                "schema": "default",
+                "extra": json.dumps(
+                    {
+                        "secure": False,
+                        "verify": True,
+                        "connect_timeout": 10,
+                        "send_receive_timeout": 300,
+                        "compress": True,
+                        "client_name": "airflow",
+                        "session_settings": {
+                            "max_execution_time": 300,
+                            "max_memory_usage": 10000000000,
+                        },
+                    },
+                    indent=1,
+                ),
+                "secure": "false",
+                "verify": "true",
+                "connect_timeout": "10",
+                "send_receive_timeout": "300",
+                "compress": "true",
+                "client_name": "airflow",
+                "session_settings": '{"max_execution_time": 300}',
+                "client_kwargs": '{"http_proxy": "http://proxy:8080"}',
+            },
+        }
+
+    def _get_client_kwargs(self) -> dict[str, Any]:
+        """
+        Build the keyword-argument dict passed to 
``clickhouse_connect.get_client()``.
+
+        Construction order (last write wins):
+
+        1. ``client_kwargs`` from the connection ``extra`` JSON — 
connection-level
+           passthrough kwargs.
+        2. ``client_kwargs`` constructor argument — task-level overrides 
merged on top.
+        3. Hook-managed keys (``host``, ``port``, ``username``, ``password``,
+           ``database``, ``secure``, ``verify``, ``client_name``, 
``settings``) —
+           always override anything in ``client_kwargs``.
+
+        Optional tuning parameters (``connect_timeout``, 
``send_receive_timeout``,
+        ``compress``) are forwarded only when explicitly set in the ``extra`` 
JSON
+        field so that the driver's own defaults apply otherwise.
+
+        ``client_name`` is always set to a string that identifies the Airflow
+        installation (see :func:`_build_client_name`).  Any ``client_name`` 
value in
+        ``extra`` is treated as a human-readable label and appended as a 
comment so
+        it appears alongside the Airflow version info in ``system.query_log``.
+
+        ``session_settings`` from ``extra`` and from the constructor 
``session_settings``
+        argument are **merged**, with the constructor argument taking 
precedence on
+        conflicting keys.
+        """
+        conn = self.get_connection(self.get_conn_id())
+        extra: dict[str, Any] = conn.extra_dejson
+
+        # Merge client_kwargs: extra values are the base, constructor values 
override.
+        raw_client_kwargs = extra.get("client_kwargs")
+        if isinstance(raw_client_kwargs, str):
+            raw_client_kwargs = json.loads(raw_client_kwargs)
+        merged_client_kwargs: dict[str, Any] = {**(raw_client_kwargs or {}), 
**self.client_kwargs}
+
+        # Strip hook-managed keys so they cannot overwrite connection 
credentials
+        # or versioning info.  Log dropped keys at DEBUG to aid 
troubleshooting.
+        dropped = _HOOK_MANAGED_KWARGS.intersection(merged_client_kwargs)
+        if dropped:
+            self.log.debug(
+                "Ignoring hook-managed keys in client_kwargs: %s",
+                sorted(dropped),
+            )
+        kwargs: dict[str, Any] = {
+            k: v for k, v in merged_client_kwargs.items() if k not in 
_HOOK_MANAGED_KWARGS
+        }
+
+        # Hook-managed connection parameters always take precedence.
+        kwargs.update(
+            {
+                "host": conn.host or "localhost",
+                "port": int(conn.port) if conn.port else 8123,
+                "username": conn.login or "default",
+                "password": conn.password or "",
+                "database": self.database or conn.schema or "default",
+                "secure": bool(extra.get("secure", False)),
+                "verify": bool(extra.get("verify", True)),
+            }
+        )
+
+        for key in _OPTIONAL_CLIENT_KWARGS:
+            if key in extra:
+                kwargs[key] = extra[key]
+
+        # Always embed Airflow + provider version; user's 'client_name' extra 
is appended to the User-Agent header.
+        kwargs["client_name"] = _build_client_name(extra.get("client_name"))
+
+        # Merge session_settings: extra values are the base, constructor 
values override.
+        raw = extra.get("session_settings")
+        if isinstance(raw, str):
+            raw = json.loads(raw)
+        merged: dict[str, Any] = {**(raw or {}), **self.session_settings}
+        if merged:
+            kwargs["settings"] = merged
+
+        return kwargs
+
+    def get_conn(self) -> ClickHouseConnection:
+        """Return a DB-API 2.0 compatible connection backed by 
``clickhouse_connect``."""
+        import clickhouse_connect
+
+        client = clickhouse_connect.get_client(**self._get_client_kwargs())
+        return ClickHouseConnection(client)
+
+    def get_client(self) -> clickhouse_connect.driver.client.Client:
+        """
+        Return the raw ``clickhouse_connect`` Client for ClickHouse-specific 
operations.
+
+        Use this for bulk inserts, streaming queries, or any operation that
+        benefits from the native ``clickhouse_connect`` API rather than DB-API
+        cursors.  The caller is responsible for closing the client.
+        """
+        import clickhouse_connect
+
+        return clickhouse_connect.get_client(**self._get_client_kwargs())
+
+    def bulk_insert_rows(
+        self,
+        table: str,
+        rows: list[tuple],
+        column_names: list[str],
+        commit_every: int = 5000,
+    ) -> None:
+        """
+        Insert rows into a ClickHouse table using the native columnar insert.
+
+        Uses ``clickhouse_connect``'s optimized insert path, which is
+        significantly faster than row-by-row cursor inserts for large datasets.
+
+        :param table: Target table name.
+        :param rows: List of row tuples to insert.
+        :param column_names: Column names matching each position in the row 
tuples.
+        :param commit_every: Batch size (rows per insert call). Defaults to 
5000.
+        """
+        if not rows:
+            self.log.warning(
+                "bulk_insert_rows called with an empty rows list — nothing was 
inserted into %s.", table
+            )
+            return
+
+        client = self.get_client()
+        try:
+            for i in range(0, len(rows), commit_every):
+                batch = rows[i : i + commit_every]
+                client.insert(table, batch, column_names=column_names)

Review Comment:
   `client.insert()` already streams data to ClickHouse in adaptive ~2MB blocks 
as a single `Transfer-Encoding: chunked` HTTP request. Batching at the Python 
layer here doesn't help and could actually even hurt because each 
`client.insert()` call without a reusable `context` issues a `DESCRIBE TABLE` 
to resolve column types even when `column_names` is supplied `column_names` is 
only used to filter/order the describe result.
   
   I'd recommend either:
   1. Just call `client.insert(table, rows, column_names=column_names)` once 
and let the driver handle block-level streaming internally.
   2. If batching is genuinely needed (e.g. memory pressure on extremely large 
inputs), build the context once and reuse it like:
     ```python
     ctx = client.create_insert_context(table, column_names=column_names)
     for i in range(0, len(rows), commit_every):
         client.insert(data=rows[i:i+commit_every], context=ctx)
     ````
   
   Side note, I think `commit_every` is a misnomer as inserts are _not_ 
transactional. `batch_size` might be a more appropriate term.



##########
providers/clickhouse/tests/unit/clickhouse/hooks/test_clickhouse.py:
##########
@@ -0,0 +1,1344 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import re
+from unittest.mock import ANY, MagicMock, patch
+
+import pytest
+
+from airflow.models import Connection
+from airflow.providers.clickhouse.hooks.clickhouse import (
+    ClickHouseConnection,
+    ClickHouseHook,
+)
+
+# ---------------------------------------------------------------------------
+# Fixtures / shared connection definitions
+# ---------------------------------------------------------------------------
+
+BASE_CONN = Connection(
+    conn_id="clickhouse_test",
+    conn_type="clickhouse",
+    host="clickhouse-host",
+    port=8123,
+    login="user",
+    password="secret",
+    schema="analytics",
+)
+
+BASE_CONN_EXTRA = Connection(
+    conn_id="clickhouse_test_extra",
+    conn_type="clickhouse",
+    host="secure-host",
+    port=8443,
+    login="admin",
+    password="topsecret",
+    schema="prod",
+    extra=json.dumps(
+        {
+            "secure": True,
+            "verify": False,
+            "connect_timeout": 30,
+            "send_receive_timeout": 600,
+            "compress": False,
+            "client_name": "my-airflow",
+        }
+    ),
+)
+
+MINIMAL_CONN = Connection(
+    conn_id="clickhouse_minimal",
+    conn_type="clickhouse",
+)
+
+CONN_WITH_CLIENT_KWARGS = Connection(
+    conn_id="clickhouse_client_kwargs",
+    conn_type="clickhouse",
+    host="ch-host",
+    port=8123,
+    login="user",
+    password="pass",
+    schema="db",
+    extra=json.dumps(
+        {
+            "client_kwargs": {
+                "http_proxy": "http://proxy:8080";,
+                "pool_mgr_params": {"num_pools": 4},
+            }
+        }
+    ),
+)
+
+CONN_WITH_CLIENT_KWARGS_STR = Connection(
+    conn_id="clickhouse_client_kwargs_str",
+    conn_type="clickhouse",
+    host="ch-host",
+    port=8123,
+    login="user",
+    password="pass",
+    schema="db",
+    extra=json.dumps(
+        {
+            # client_kwargs stored as a JSON string (as the UI text field 
stores it)
+            "client_kwargs": '{"http_proxy": "http://proxy:8080"}'
+        }
+    ),
+)
+
+CONN_WITH_SESSION_SETTINGS = Connection(
+    conn_id="clickhouse_session",
+    conn_type="clickhouse",
+    host="ch-host",
+    port=8123,
+    login="user",
+    password="pass",
+    schema="db",
+    extra=json.dumps(
+        {
+            "session_settings": {
+                "max_execution_time": 120,
+                "max_threads": 4,
+            }
+        }
+    ),
+)
+
+CONN_WITH_SESSION_SETTINGS_STR = Connection(
+    conn_id="clickhouse_session_str",
+    conn_type="clickhouse",
+    host="ch-host",
+    port=8123,
+    login="user",
+    password="pass",
+    schema="db",
+    extra=json.dumps(
+        {
+            # session_settings stored as a JSON string (as the UI text field 
stores it)
+            "session_settings": '{"max_execution_time": 60}'
+        }
+    ),
+)
+
+
+# ---------------------------------------------------------------------------
+# Tests: get_conn
+# ---------------------------------------------------------------------------
+
+
+class TestClickHouseHookGetConn:
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    @patch("clickhouse_connect.get_client")
+    def test_get_conn_basic(self, mock_get_client, mock_get_connection):
+        mock_get_connection.return_value = BASE_CONN
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_test")
+        conn = hook.get_conn()
+
+        mock_get_client.assert_called_once_with(
+            host="clickhouse-host",
+            port=8123,
+            username="user",
+            password="secret",
+            database="analytics",
+            secure=False,
+            verify=True,
+            client_name=ANY,  # always set; format verified in 
TestClickHouseHookClientName
+        )
+        assert isinstance(conn, ClickHouseConnection)
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    @patch("clickhouse_connect.get_client")
+    def test_get_conn_with_extra_params(self, mock_get_client, 
mock_get_connection):
+        mock_get_connection.return_value = BASE_CONN_EXTRA
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_test_extra")
+        hook.get_conn()
+
+        mock_get_client.assert_called_once_with(
+            host="secure-host",
+            port=8443,
+            username="admin",
+            password="topsecret",
+            database="prod",
+            secure=True,
+            verify=False,
+            connect_timeout=30,
+            send_receive_timeout=600,
+            compress=False,
+            client_name=ANY,  # contains Airflow version + "(my-airflow)" 
comment
+        )
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    @patch("clickhouse_connect.get_client")
+    def test_get_conn_minimal_defaults(self, mock_get_client, 
mock_get_connection):
+        """All connection fields absent → sensible defaults applied."""
+        mock_get_connection.return_value = MINIMAL_CONN
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_minimal")
+        hook.get_conn()
+
+        mock_get_client.assert_called_once_with(
+            host="localhost",
+            port=8123,
+            username="default",
+            password="",
+            database="default",
+            secure=False,
+            verify=True,
+            client_name=ANY,  # always set; format verified in 
TestClickHouseHookClientName
+        )
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    @patch("clickhouse_connect.get_client")
+    def test_get_conn_database_overrides_connection_schema(self, 
mock_get_client, mock_get_connection):
+        """database constructor arg must override the schema field from the 
connection."""
+        mock_get_connection.return_value = BASE_CONN  # schema="analytics"
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_test", 
database="overridden_db")
+        hook.get_conn()
+
+        call_kwargs = mock_get_client.call_args.kwargs
+        assert call_kwargs["database"] == "overridden_db"
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    @patch("clickhouse_connect.get_client")
+    def test_get_conn_no_database_falls_back_to_connection_schema(self, 
mock_get_client, mock_get_connection):
+        """Without a database constructor arg the connection schema is used."""
+        mock_get_connection.return_value = BASE_CONN  # schema="analytics"
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_test")
+        hook.get_conn()
+
+        call_kwargs = mock_get_client.call_args.kwargs
+        assert call_kwargs["database"] == "analytics"
+
+
+# ---------------------------------------------------------------------------
+# Tests: get_uri
+# ---------------------------------------------------------------------------
+
+
+class TestClickHouseHookGetUri:
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    def test_get_uri_with_password(self, mock_get_connection):
+        mock_get_connection.return_value = BASE_CONN
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_test")
+        uri = hook.get_uri()
+
+        assert uri == 
"clickhousedb://user:secret@clickhouse-host:8123/analytics"
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    def test_get_uri_without_password(self, mock_get_connection):
+        conn = Connection(
+            conn_id="clickhouse_nopass",
+            conn_type="clickhouse",
+            host="myhost",
+            port=8123,
+            login="reader",
+            schema="logs",
+        )
+        mock_get_connection.return_value = conn
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_nopass")
+        uri = hook.get_uri()
+
+        assert uri == "clickhousedb://reader@myhost:8123/logs"
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    def test_get_uri_password_url_encoded(self, mock_get_connection):
+        """Special characters in the password must be percent-encoded."""
+        conn = Connection(
+            conn_id="clickhouse_special",
+            conn_type="clickhouse",
+            host="host",
+            port=8123,
+            login="user",
+            password="p@ss/w0rd!",
+            schema="db",
+        )
+        mock_get_connection.return_value = conn
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_special")
+        uri = hook.get_uri()
+
+        assert "@" not in uri.split("://")[1].split("@")[0]  # password part 
is encoded
+        assert "p%40ss" in uri or "p%40" in uri  # @ is encoded
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    def test_get_uri_defaults(self, mock_get_connection):
+        mock_get_connection.return_value = MINIMAL_CONN
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_minimal")
+        uri = hook.get_uri()
+
+        assert uri == "clickhousedb://default@localhost:8123/default"
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    def test_get_uri_database_overrides_connection_schema(self, 
mock_get_connection):
+        """database constructor arg must appear in the URI instead of the 
connection schema."""
+        mock_get_connection.return_value = BASE_CONN  # schema="analytics"
+        hook = ClickHouseHook(clickhouse_conn_id="clickhouse_test", 
database="overridden_db")
+        uri = hook.get_uri()
+
+        assert uri.endswith("/overridden_db")
+        assert "analytics" not in uri
+
+    
@patch("airflow.providers.clickhouse.hooks.clickhouse.ClickHouseHook.get_connection")
+    def test_get_uri_secure_uses_clickhousedbs_scheme(self, 
mock_get_connection):
+        """secure=True in extra must produce the clickhousedbs:// (HTTPS) 
scheme."""
+        conn = Connection(
+            conn_id="ch_secure",
+            conn_type="clickhouse",
+            host="secure-host",
+            port=8443,
+            login="user",
+            password="pass",
+            schema="db",
+            extra=json.dumps({"secure": True}),
+        )
+        mock_get_connection.return_value = conn
+        uri = ClickHouseHook(clickhouse_conn_id="ch_secure").get_uri()
+        assert uri == "clickhousedbs://user:pass@secure-host:8443/db"

Review Comment:
   This test can be deleted/reconfigured in light of the other comment 
explaining how `clickhousedbs` isn't a valid scheme.



##########
providers/clickhouse/src/airflow/providers/clickhouse/hooks/clickhouse.py:
##########
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+from typing import TYPE_CHECKING, Any
+from urllib.parse import quote_plus
+
+from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+if TYPE_CHECKING:
+    import clickhouse_connect.driver.client
+    from clickhouse_connect.dbapi.cursor import Cursor as ClickHouseDbApiCursor
+
+# Optional scalar parameters forwarded verbatim to 
clickhouse_connect.get_client()
+# Note: client_name is handled separately by _build_client_name() so Airflow 
version
+# info is always embedded in the HTTP User-Agent / system.query_log entry.
+_OPTIONAL_CLIENT_KWARGS = (
+    "connect_timeout",
+    "send_receive_timeout",
+    "compress",
+)
+
+# kwargs that the hook always manages itself.  User-supplied client_kwargs that
+# overlap with these keys are dropped and logged at DEBUG so the hook's own 
values always win.
+_HOOK_MANAGED_KWARGS: frozenset[str] = frozenset(
+    {
+        "host",
+        "port",
+        "username",
+        "password",
+        "database",
+        "secure",
+        "verify",
+        "client_name",
+        "settings",
+    }
+)
+
+
+def _build_client_name(custom: str | None = None) -> str:
+    """
+    Return the ``client_name`` string passed to 
``clickhouse_connect.get_client()``.
+
+    The value is prepended to the HTTP ``User-Agent`` header that ClickHouse 
records
+    in ``system.query_log``, so every query is traceable back to its Airflow 
source.
+
+    Format (no custom label)::
+
+        apache-airflow/<airflow_version> 
apache-airflow-providers-clickhouse/<provider_version>
+
+    Format (with the ``client_name`` extra field set to ``"my-pipeline"``)::
+
+        apache-airflow/<airflow_version> 
apache-airflow-providers-clickhouse/<provider_version> (my-pipeline)
+
+    The ``clickhouse_connect`` library appends its own token and OS 
information, so
+    the full ``User-Agent`` looks like::
+
+        apache-airflow/X.Y.Z apache-airflow-providers-clickhouse/X.Y.Z
+        clickhouse-connect/X.Y.Z (lv:py/X.Y.Z; mode:sync; os:linux)
+    """
+    import airflow
+    from airflow.providers.clickhouse import __version__ as provider_version
+
+    name = f"apache-airflow/{airflow.__version__} 
apache-airflow-providers-clickhouse/{provider_version}"
+    if custom:
+        name = f"{name} ({custom.strip()})"
+    return name
+
+
+class ClickHouseConnection:
+    """
+    Minimal DB-API 2.0 connection adapter wrapping a ``clickhouse_connect`` 
Client.
+
+    SQL execution is delegated to ``clickhouse_connect.dbapi.cursor.Cursor``, 
which
+    routes each statement to ``client.query()`` or ``client.command()`` 
automatically
+    by inspecting the SQL keyword after stripping comments — the same logic 
used by
+    the ``clickhouse-connect`` SQLAlchemy dialect.
+
+    ClickHouse has no multi-statement transactions.  Every statement is 
effectively
+    auto-committed, so ``commit()`` and ``rollback()`` are intentional no-ops 
and
+    the ``autocommit`` attribute is always ``True``.  ``DbApiHook.run()`` 
checks
+    ``conn.autocommit`` via :meth:`ClickHouseHook.get_autocommit` and skips the
+    ``conn.commit()`` call when it is truthy.
+    """
+
+    # Signals to DbApiHook.get_autocommit() that no explicit commit is needed.
+    autocommit: bool = True
+
+    def __init__(self, client: clickhouse_connect.driver.client.Client) -> 
None:
+        self._client = client
+
+    def cursor(self) -> ClickHouseDbApiCursor:
+        from clickhouse_connect.dbapi.cursor import Cursor
+
+        return Cursor(self._client)
+
+    def close(self) -> None:
+        self._client.close()
+
+    def commit(self) -> None:
+        pass  # ClickHouse has no multi-statement transactions
+
+    def rollback(self) -> None:
+        pass  # ClickHouse has no multi-statement transactions
+
+
+class ClickHouseHook(DbApiHook):
+    """
+    Interact with ClickHouse via the HTTP interface (``clickhouse-connect``).
+
+    This hook wraps ``clickhouse_connect.get_client()`` behind a thin DB-API 
2.0
+    adapter (:class:`ClickHouseConnection`), so all standard
+    
:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
+    features work out of the box (templating, ``handler``, 
``split_statements``,
+    etc.).
+
+    :param database: Optional database name.  Overrides the ``schema`` field 
of the
+        Airflow connection.  Useful when one connection points to a ClickHouse 
cluster
+        and individual tasks need to target different databases.
+    :param session_settings: Optional dict of ClickHouse session-level settings
+        (e.g. ``{"max_execution_time": 60, "max_threads": 4}``).  Values 
supplied
+        here are **merged on top of** any ``session_settings`` dict already 
present
+        in the connection's ``extra`` JSON field, with the constructor argument
+        taking precedence. For a full list of available session settings visit 
https://clickhouse.com/docs/operations/settings/settings
+    :param client_kwargs: Optional dict of additional keyword arguments 
forwarded
+        verbatim to ``clickhouse_connect.get_client()``.  Use this to pass any
+        parameter not otherwise exposed by the hook (e.g. ``http_proxy``,
+        ``pool_mgr_params``).  Values supplied here are **merged on top of** 
any
+        ``client_kwargs`` dict already present in the connection's ``extra`` 
JSON
+        field, with the constructor argument taking precedence on conflicting 
keys.
+        Keys that the hook manages itself (``host``, ``port``, ``username``,
+        ``password``, ``database``, ``secure``, ``verify``, ``client_name``,
+        ``settings``) are silently ignored so that hook-managed values always
+        take precedence.
+
+    .. seealso::
+        - `clickhouse-connect documentation 
<https://clickhouse.com/docs/en/integrations/python>`_
+        - `clickhouse session settings documentation 
<https://clickhouse.com/docs/operations/settings/settings>`_
+        - :ref:`howto/connection:clickhouse`
+
+    """
+
+    conn_name_attr = "clickhouse_conn_id"
+    default_conn_name = "clickhouse_default"
+    conn_type = "clickhouse"
+    hook_name = "ClickHouse"
+
+    # ClickHouse has no multi-statement transactions; every statement is 
auto-committed.
+    supports_autocommit = True
+    _test_connection_sql = "SELECT 1"
+
+    def set_autocommit(self, conn: ClickHouseConnection, autocommit: bool) -> 
None:
+        """No-op: ClickHouse has no transaction support."""
+
+    def get_autocommit(self, conn: ClickHouseConnection) -> bool:
+        """Return ``True``: ClickHouse auto-commits every statement."""
+        return True
+
+    def __init__(
+        self,
+        *args,
+        database: str | None = None,
+        session_settings: dict[str, Any] | None = None,
+        client_kwargs: dict[str, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.database = database
+        self.session_settings: dict[str, Any] = session_settings or {}
+        self.client_kwargs: dict[str, Any] = client_kwargs or {}
+
+    @classmethod
+    def get_connection_form_widgets(cls) -> dict[str, Any]:
+        """Return extra connection form widgets shown in the Airflow UI."""
+        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import BooleanField, IntegerField, StringField
+
+        return {
+            "secure": BooleanField(
+                label=lazy_gettext("Use TLS (HTTPS)"),
+                description="Enable TLS/HTTPS for the connection (default: 
False).",
+            ),
+            "verify": BooleanField(
+                label=lazy_gettext("Verify SSL Certificate"),
+                description="Verify the server SSL certificate when TLS is 
enabled (default: True).",
+            ),
+            "connect_timeout": IntegerField(
+                lazy_gettext("Connection Timeout (seconds)"),
+                widget=BS3TextFieldWidget(),
+            ),
+            "send_receive_timeout": IntegerField(
+                lazy_gettext("Query Timeout (seconds)"),
+                widget=BS3TextFieldWidget(),
+            ),
+            "compress": BooleanField(
+                label=lazy_gettext("Enable LZ4 Compression"),
+                description="Compress query results with LZ4 (default: True).",
+            ),
+            "client_name": StringField(
+                lazy_gettext("Client Name (label)"),
+                widget=BS3TextFieldWidget(),
+                description=(
+                    "Optional label appended to the Airflow version identifier 
in the "
+                    "ClickHouse HTTP User-Agent and system.query_log 
client_name field. "
+                    "The hook always prepends apache-airflow/<version> "
+                    "apache-airflow-providers-clickhouse/<version>."
+                ),
+            ),
+            "session_settings": StringField(
+                lazy_gettext("Session Settings (JSON)"),
+                widget=BS3TextFieldWidget(),
+                description=(
+                    "ClickHouse session-level settings as a JSON object, "
+                    'e.g. {"max_execution_time": 60, "max_threads": 4}.'
+                ),
+            ),
+            "client_kwargs": StringField(
+                lazy_gettext("Client kwargs (JSON)"),
+                widget=BS3TextFieldWidget(),
+                description=(
+                    "Additional keyword arguments forwarded to 
clickhouse_connect.get_client() "
+                    'as a JSON object, e.g. {"http_proxy": 
"http://proxy:8080"}. '
+                    "Hook-managed keys (host, port, username, password, 
database, secure, "
+                    "verify, client_name, settings) are ignored."
+                ),
+            ),
+        }
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Return custom field behaviour for the connection form."""
+        return {
+            "hidden_fields": [],
+            "relabeling": {
+                "schema": "Database",
+            },
+            "placeholders": {
+                "host": "localhost",
+                "port": "8123",
+                "login": "default",
+                "schema": "default",
+                "extra": json.dumps(
+                    {
+                        "secure": False,
+                        "verify": True,
+                        "connect_timeout": 10,
+                        "send_receive_timeout": 300,
+                        "compress": True,
+                        "client_name": "airflow",
+                        "session_settings": {
+                            "max_execution_time": 300,
+                            "max_memory_usage": 10000000000,
+                        },
+                    },
+                    indent=1,
+                ),
+                "secure": "false",
+                "verify": "true",
+                "connect_timeout": "10",
+                "send_receive_timeout": "300",
+                "compress": "true",
+                "client_name": "airflow",
+                "session_settings": '{"max_execution_time": 300}',
+                "client_kwargs": '{"http_proxy": "http://proxy:8080"}',
+            },
+        }
+
+    def _get_client_kwargs(self) -> dict[str, Any]:
+        """
+        Build the keyword-argument dict passed to 
``clickhouse_connect.get_client()``.
+
+        Construction order (last write wins):
+
+        1. ``client_kwargs`` from the connection ``extra`` JSON — 
connection-level
+           passthrough kwargs.
+        2. ``client_kwargs`` constructor argument — task-level overrides 
merged on top.
+        3. Hook-managed keys (``host``, ``port``, ``username``, ``password``,
+           ``database``, ``secure``, ``verify``, ``client_name``, 
``settings``) —
+           always override anything in ``client_kwargs``.
+
+        Optional tuning parameters (``connect_timeout``, 
``send_receive_timeout``,
+        ``compress``) are forwarded only when explicitly set in the ``extra`` 
JSON
+        field so that the driver's own defaults apply otherwise.
+
+        ``client_name`` is always set to a string that identifies the Airflow
+        installation (see :func:`_build_client_name`).  Any ``client_name`` 
value in
+        ``extra`` is treated as a human-readable label and appended as a 
comment so
+        it appears alongside the Airflow version info in ``system.query_log``.
+
+        ``session_settings`` from ``extra`` and from the constructor 
``session_settings``
+        argument are **merged**, with the constructor argument taking 
precedence on
+        conflicting keys.
+        """
+        conn = self.get_connection(self.get_conn_id())
+        extra: dict[str, Any] = conn.extra_dejson
+
+        # Merge client_kwargs: extra values are the base, constructor values 
override.
+        raw_client_kwargs = extra.get("client_kwargs")
+        if isinstance(raw_client_kwargs, str):
+            raw_client_kwargs = json.loads(raw_client_kwargs)
+        merged_client_kwargs: dict[str, Any] = {**(raw_client_kwargs or {}), 
**self.client_kwargs}
+
+        # Strip hook-managed keys so they cannot overwrite connection 
credentials
+        # or versioning info.  Log dropped keys at DEBUG to aid 
troubleshooting.
+        dropped = _HOOK_MANAGED_KWARGS.intersection(merged_client_kwargs)
+        if dropped:
+            self.log.debug(
+                "Ignoring hook-managed keys in client_kwargs: %s",
+                sorted(dropped),
+            )
+        kwargs: dict[str, Any] = {
+            k: v for k, v in merged_client_kwargs.items() if k not in 
_HOOK_MANAGED_KWARGS
+        }
+
+        # Hook-managed connection parameters always take precedence.
+        kwargs.update(
+            {
+                "host": conn.host or "localhost",
+                "port": int(conn.port) if conn.port else 8123,
+                "username": conn.login or "default",
+                "password": conn.password or "",
+                "database": self.database or conn.schema or "default",
+                "secure": bool(extra.get("secure", False)),
+                "verify": bool(extra.get("verify", True)),
+            }
+        )
+
+        for key in _OPTIONAL_CLIENT_KWARGS:
+            if key in extra:
+                kwargs[key] = extra[key]
+
+        # Always embed Airflow + provider version; user's 'client_name' extra 
is appended to the User-Agent header.
+        kwargs["client_name"] = _build_client_name(extra.get("client_name"))
+
+        # Merge session_settings: extra values are the base, constructor 
values override.
+        raw = extra.get("session_settings")
+        if isinstance(raw, str):
+            raw = json.loads(raw)
+        merged: dict[str, Any] = {**(raw or {}), **self.session_settings}
+        if merged:
+            kwargs["settings"] = merged
+
+        return kwargs
+
+    def get_conn(self) -> ClickHouseConnection:
+        """Return a DB-API 2.0 compatible connection backed by 
``clickhouse_connect``."""
+        import clickhouse_connect
+
+        client = clickhouse_connect.get_client(**self._get_client_kwargs())
+        return ClickHouseConnection(client)
+
+    def get_client(self) -> clickhouse_connect.driver.client.Client:
+        """
+        Return the raw ``clickhouse_connect`` Client for ClickHouse-specific 
operations.
+
+        Use this for bulk inserts, streaming queries, or any operation that
+        benefits from the native ``clickhouse_connect`` API rather than DB-API
+        cursors.  The caller is responsible for closing the client.
+        """
+        import clickhouse_connect
+
+        return clickhouse_connect.get_client(**self._get_client_kwargs())
+
+    def bulk_insert_rows(
+        self,
+        table: str,
+        rows: list[tuple],
+        column_names: list[str],
+        commit_every: int = 5000,
+    ) -> None:
+        """
+        Insert rows into a ClickHouse table using the native columnar insert.
+
+        Uses ``clickhouse_connect``'s optimized insert path, which is
+        significantly faster than row-by-row cursor inserts for large datasets.
+
+        :param table: Target table name.
+        :param rows: List of row tuples to insert.
+        :param column_names: Column names matching each position in the row 
tuples.
+        :param commit_every: Batch size (rows per insert call). Defaults to 
5000.
+        """
+        if not rows:
+            self.log.warning(
+                "bulk_insert_rows called with an empty rows list — nothing was 
inserted into %s.", table
+            )
+            return
+
+        client = self.get_client()
+        try:
+            for i in range(0, len(rows), commit_every):
+                batch = rows[i : i + commit_every]
+                client.insert(table, batch, column_names=column_names)
+            self.log.info("Inserted %d rows into %s", len(rows), table)
+        finally:
+            client.close()
+
+    def get_uri(self) -> str:
+        """
+        Return a SQLAlchemy-compatible URI for ``get_sqlalchemy_engine()``.
+
+        Uses the ``clickhousedb://`` scheme (HTTP) or ``clickhousedbs://`` 
scheme
+        (HTTPS/TLS) provided by ``clickhouse-connect``, chosen based on the
+        ``secure`` field in the connection's ``extra`` JSON.
+        """
+        conn = self.get_connection(self.get_conn_id())
+        extra: dict[str, Any] = conn.extra_dejson
+        username = conn.login or "default"
+        password = quote_plus(conn.password) if conn.password else ""
+        host = conn.host or "localhost"
+        port = int(conn.port) if conn.port else 8123
+        database = self.database or conn.schema or "default"
+        scheme = "clickhousedbs" if bool(extra.get("secure", False)) else 
"clickhousedb"

Review Comment:
   `clickhousedbs` isn't a registered scheme. clickhouse-connect only registers 
`clickhousedb` (and `clickhousedb+connect`) as a SQLAlchemy dialect. The way 
you'll want to use with a TLS connection is a single scheme with secure as a 
query parameter, which the dbapi `Connection.__init__` forwards via 
`generic_args` -> `create_client`. e.g. 
`clickhousedb://user:pw@host:port/db?secure=true&verify=true`.
   
   It's probably worth wiring the other tuning params like `connect_timeout`, 
`send_receive_timeout`, `compress`, etc. through the query string the same way, 
otherwise SQLAlchemy-path users silently lose the settings change ability that 
DB-API-path users get.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to