This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bd04e2bd900 Fix: Handle SQLAlchemy_URI for Apache Impala  (#57153)
bd04e2bd900 is described below

commit bd04e2bd9008444b1e8f28865d18eff7edd3ac75
Author: Piyush Mudgal <[email protected]>
AuthorDate: Tue Oct 28 14:37:02 2025 +0530

    Fix: Handle SQLAlchemy_URI for Apache Impala  (#57153)
    
    * Added SQL alchemy compatible code in impala.py
    
    * Updating Test
    
    * adding two test
    
    * Completed test
    
    * Finish writing test
    
    * Impala-hook
    
    * changed Not in -> !=
---
 .../providers/apache/impala/hooks/impala.py        |  28 +++
 .../unit/apache/impala/hooks/test_impala_sql.py    | 225 +++++++++++++++++++++
 2 files changed, 253 insertions(+)

diff --git 
a/providers/apache/impala/src/airflow/providers/apache/impala/hooks/impala.py 
b/providers/apache/impala/src/airflow/providers/apache/impala/hooks/impala.py
index aaa51094555..fc63a9b97cf 100644
--- 
a/providers/apache/impala/src/airflow/providers/apache/impala/hooks/impala.py
+++ 
b/providers/apache/impala/src/airflow/providers/apache/impala/hooks/impala.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING
 
 from impala.dbapi import connect
+from sqlalchemy.engine import URL
 
 from airflow.providers.common.sql.hooks.sql import DbApiHook
 
@@ -45,3 +46,30 @@ class ImpalaHook(DbApiHook):
             database=connection.schema,
             **connection.extra_dejson,
         )
+
+    @property
+    def sqlalchemy_url(self) -> URL:
+        """Return a `sqlalchemy.engine.URL` object constructed from the 
connection."""
+        conn = self.get_connection(self.get_conn_id())
+        extra = conn.extra_dejson or {}
+
+        required_attrs = ["host", "login"]
+        for attr in required_attrs:
+            if getattr(conn, attr) is None:
+                raise ValueError(f"Impala Connection Error: '{attr}' is 
missing in the connection")
+
+        query = {k: str(v) for k, v in extra.items() if v is not None and k != 
"__extra__"}
+
+        return URL.create(
+            drivername="impala",
+            username=conn.login,
+            password=conn.password or "",
+            host=str(conn.host),
+            port=conn.port or 21050,
+            database=conn.schema,
+            query=query,
+        )
+
+    def get_uri(self) -> str:
+        """Return a SQLAlchemy engine URL as a string."""
+        return self.sqlalchemy_url.render_as_string(hide_password=False)
diff --git 
a/providers/apache/impala/tests/unit/apache/impala/hooks/test_impala_sql.py 
b/providers/apache/impala/tests/unit/apache/impala/hooks/test_impala_sql.py
new file mode 100644
index 00000000000..c2f50466618
--- /dev/null
+++ b/providers/apache/impala/tests/unit/apache/impala/hooks/test_impala_sql.py
@@ -0,0 +1,225 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from unittest import mock
+from unittest.mock import MagicMock, patch
+
+import pytest
+from sqlalchemy.engine.url import make_url
+
+from airflow.models import Connection
+from airflow.providers.apache.impala.hooks.impala import ImpalaHook
+
+DEFAULT_CONN_ID = "impala_default"
+DEFAULT_HOST = "localhost"
+DEFAULT_PORT = 21050
+DEFAULT_LOGIN = "user"
+DEFAULT_PASSWORD = "pass"
+DEFAULT_SCHEMA = "default_db"
+
+
[email protected]
+def mock_connection(create_connection_without_db) -> Connection:
+    """create a mocked Airflow connection for Impala."""
+    conn = Connection(
+        conn_id=DEFAULT_CONN_ID,
+        conn_type="impala",
+        host=DEFAULT_HOST,
+        login=DEFAULT_LOGIN,
+        password=None,
+        port=DEFAULT_PORT,
+        schema=DEFAULT_SCHEMA,
+    )
+    create_connection_without_db(conn)
+    return conn
+
+
[email protected]
+def impala_hook() -> ImpalaHook:
+    """Fixture for ImpalaHook with mocked connection"""
+    return ImpalaHook(impala_conn_id=DEFAULT_CONN_ID)
+
+
+def get_cursor_descriptions(fields: list[str]) -> list[tuple[str]]:
+    return [(field,) for field in fields]
+
+
[email protected](
+    "host, login, password, port, schema, extra_dict, expected_query",
+    [
+        (
+            "localhost",
+            "user",
+            "pass",
+            21050,
+            "default_db",
+            {},
+            {},
+        ),
+        (
+            "impala-secure.company.com",
+            "secure_user",
+            "secret",
+            21050,
+            "analytics",
+            {"use_ssl": "True"},
+            {"use_ssl": "True"},
+        ),
+        (
+            "impala-kerberos.company.com",
+            "kerb_user",
+            None,
+            21050,
+            "sales",
+            {"auth_mechanism": "GSSAPI", "kerberos_service_name": "impala"},
+            {"auth_mechanism": "GSSAPI", "kerberos_service_name": "impala"},
+        ),
+        (
+            "impala.company.com",
+            "timeout_user",
+            "pw123",
+            21050,
+            "warehouse",
+            {"timeout": 30},
+            {"timeout": "30"},
+        ),
+    ],
+)
+def test_sqlalchemy_url_property(
+    impala_hook, mock_connection, host, login, password, port, schema, 
extra_dict, expected_query
+):
+    """Tests various custom configurations passed via the 'extra' field."""
+    mock_connection.host = host
+    mock_connection.login = login
+    mock_connection.password = password
+    mock_connection.port = port
+    mock_connection.schema = schema
+    mock_connection.extra = json.dumps(extra_dict) if extra_dict else None
+
+    with patch.object(impala_hook, "get_connection", 
return_value=mock_connection):
+        url = impala_hook.sqlalchemy_url
+    expected_password = password or ""
+    assert url.drivername == "impala"
+    assert url.username == login
+    assert url.password == expected_password
+    assert url.host == host
+    assert url.port == port
+    assert url.database == schema
+    assert url.query == expected_query
+
+
[email protected](
+    "sql, expected_rows",
+    [
+        ("SELECT * FROM users", [("Alice", 1), ("Bob", 2)]),
+        ("SELECT 1", [(1,)]),
+    ],
+)
+def test_impala_run_query(impala_hook, mock_connection, sql, expected_rows):
+    cursor = MagicMock()
+    cursor.fetchall.return_value = expected_rows
+    cursor.description = get_cursor_descriptions([f"col{i}" for i in 
range(len(expected_rows[0]))])
+
+    type(cursor).rowcount = mock.PropertyMock(return_value=len(expected_rows))
+    mock_conn = MagicMock()
+    mock_conn.host = mock_connection.host
+    mock_conn.login = mock_connection.login
+    mock_conn.password = mock_connection.password
+    mock_conn.schema = mock_connection.schema
+    mock_conn.cursor.return_value = cursor
+    with patch("airflow.providers.apache.impala.hooks.impala.connect", 
return_value=mock_conn):
+        with patch.object(impala_hook, "get_connection", 
return_value=mock_conn):
+            result = impala_hook.run(sql, handler=lambda cur: cur.fetchall())
+
+    cursor.execute.assert_called_once_with(sql)
+    assert result == expected_rows
+
+
+def test_get_sqlalchemy_engine(impala_hook, mock_connection, mocker):
+    mock_create_engine = 
mocker.patch("airflow.providers.common.sql.hooks.sql.create_engine", 
autospec=True)
+    mock_engine = MagicMock()
+    mock_create_engine.return_value = mock_engine
+
+    with patch.object(impala_hook, "get_connection", 
return_value=mock_connection):
+        engine = impala_hook.get_sqlalchemy_engine()
+
+    assert engine is mock_engine
+
+    call_args = mock_create_engine.call_args[1]
+    actual_url = call_args["url"]
+
+    assert actual_url.drivername == "impala"
+    assert actual_url.host == DEFAULT_HOST
+    assert actual_url.username == DEFAULT_LOGIN
+    assert actual_url.password == (mock_connection.password or "")
+    assert actual_url.port == DEFAULT_PORT
+    assert actual_url.database == DEFAULT_SCHEMA
+    assert actual_url.query == {}
+
+
+def test_get_url(impala_hook, mock_connection):
+    """Ensure get_uri() returns correct formatted URI for Impala connection"""
+
+    mock_connection.host = "impala.company.com"
+    mock_connection.port = 21050
+    mock_connection.login = "user"
+    mock_connection.password = "secret"
+    mock_connection.schema = "analytics"
+    mock_connection.extra = json.dumps({"use_ssl": "True", "auth_mechanism": 
"PLAIN"})
+
+    with patch.object(impala_hook, "get_connection", 
return_value=mock_connection):
+        uri = impala_hook.get_uri()
+
+    expected_uri = 
"impala://user:[email protected]:21050/analytics?use_ssl=True&auth_mechanism=PLAIN"
+
+    assert make_url(uri) == make_url(expected_uri)
+
+
[email protected]("sql", ["", " ", "\n"])
+def test_run_with_empty_sql(impala_hook, sql):
+    """Test that running an empty SQL string."""
+    with pytest.raises(ValueError, match="List of SQL statements is empty"):
+        impala_hook.run(sql)
+
+
[email protected]
+def impala_hook_with_timeout(create_connection_without_db):
+    conn = Connection(
+        conn_id="impala_with_timeout",
+        conn_type="impala",
+        host=DEFAULT_HOST,
+        login=DEFAULT_LOGIN,
+        password=DEFAULT_PASSWORD,
+        port=DEFAULT_PORT,
+        schema=DEFAULT_SCHEMA,
+        extra=json.dumps({"timeout": 10}),
+    )
+    create_connection_without_db(conn)
+    return ImpalaHook(impala_conn_id="impala_with_timeout")
+
+
+def test_execution_timeout_exceeded(impala_hook_with_timeout):
+    test_sql = "SELECT * FROM big_table"
+
+    with patch(
+        "airflow.providers.apache.impala.hooks.impala.ImpalaHook.run",
+        side_effect=TimeoutError("Query exceeded execution timeout"),
+    ):
+        with pytest.raises(TimeoutError, match="Query exceeded execution 
timeout"):
+            impala_hook_with_timeout.run(sql=test_sql)

Reply via email to