This is an automated email from the ASF dual-hosted git repository.

villebro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 60cd2550a7 feat: cancel impala query on stop (#30412)
60cd2550a7 is described below

commit 60cd2550a71ddafdfe671be3533b48eee633b398
Author: wugeer <[email protected]>
AuthorDate: Wed Oct 30 07:44:22 2024 +0800

    feat: cancel impala query on stop (#30412)
---
 superset/db_engine_specs/base.py                | 20 ++++++-
 superset/db_engine_specs/impala.py              | 42 ++++++++++++-
 tests/unit_tests/db_engine_specs/test_impala.py | 79 ++++++++++++++++++++++++-
 3 files changed, 135 insertions(+), 6 deletions(-)

diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py
index dcdfff6c3f..2e555e32f1 100644
--- a/superset/db_engine_specs/base.py
+++ b/superset/db_engine_specs/base.py
@@ -58,8 +58,8 @@ from sqlalchemy.sql.expression import ColumnClause, Select, 
TextAsFrom, TextClau
 from sqlalchemy.types import TypeEngine
 from sqlparse.tokens import CTE
 
-from superset import sql_parse
-from superset.constants import TimeGrain as TimeGrainConstants
+from superset import db, sql_parse
+from superset.constants import QUERY_CANCEL_KEY, TimeGrain as 
TimeGrainConstants
 from superset.databases.utils import get_table_metadata, make_url_safe
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
 from superset.exceptions import DisallowedSQLFunction, OAuth2Error, 
OAuth2RedirectError
@@ -437,6 +437,14 @@ class BaseEngineSpec:  # pylint: 
disable=too-many-public-methods
     # Driver-specific exception that should be mapped to OAuth2RedirectError
     oauth2_exception = OAuth2RedirectError
 
+    # Does the query id related to the connection?
+    # The default value is True, which means that the query id is determined 
when
+    # the connection is created.
+    # When this is changed to false in a DB engine spec it means the query id
+    # is determined only after the specific query is executed and it will 
update
+    # the `cancel_query` value in the `extra` field of the `query` object
+    has_query_id_before_execute = True
+
     @classmethod
     def is_oauth2_enabled(cls) -> bool:
         return (
@@ -1316,6 +1324,7 @@ class BaseEngineSpec:  # pylint: 
disable=too-many-public-methods
         # TODO: Fix circular import error caused by importing sql_lab.Query
 
     @classmethod
+    # pylint: disable=consider-using-transaction
     def execute_with_cursor(
         cls,
         cursor: Any,
@@ -1333,6 +1342,13 @@ class BaseEngineSpec:  # pylint: 
disable=too-many-public-methods
         """
         logger.debug("Query %d: Running query: %s", query.id, sql)
         cls.execute(cursor, sql, query.database, async_=True)
+        if not cls.has_query_id_before_execute:
+            cancel_query_id = 
query.database.db_engine_spec.get_cancel_query_id(
+                cursor, query
+            )
+            if cancel_query_id is not None:
+                query.set_extra_json_key(QUERY_CANCEL_KEY, cancel_query_id)
+                db.session.commit()
         logger.debug("Query %d: Handling cursor", query.id)
         cls.handle_cursor(cursor, query)
 
diff --git a/superset/db_engine_specs/impala.py 
b/superset/db_engine_specs/impala.py
index ea74df8316..ce34ae5648 100644
--- a/superset/db_engine_specs/impala.py
+++ b/superset/db_engine_specs/impala.py
@@ -21,8 +21,9 @@ import logging
 import re
 import time
 from datetime import datetime
-from typing import Any, TYPE_CHECKING
+from typing import Any, Optional, TYPE_CHECKING
 
+import requests
 from flask import current_app
 from sqlalchemy import types
 from sqlalchemy.engine.reflection import Inspector
@@ -57,6 +58,8 @@ class ImpalaEngineSpec(BaseEngineSpec):
         TimeGrain.YEAR: "TRUNC({col}, 'YYYY')",
     }
 
+    has_query_id_before_execute = False
+
     @classmethod
     def epoch_to_dttm(cls) -> str:
         return "from_unixtime({col})"
@@ -91,7 +94,7 @@ class ImpalaEngineSpec(BaseEngineSpec):
         :see: handle_cursor
         """
 
-        return True
+        return False
 
     @classmethod
     def execute(
@@ -160,3 +163,38 @@ class ImpalaEngineSpec(BaseEngineSpec):
         except Exception:  # pylint: disable=broad-except
             logger.debug("Call to status() failed ")
             return
+
+    @classmethod
+    def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
+        """
+        Get Impala Query ID that will be used to cancel the running
+        queries to release impala resources.
+
+        :param cursor: Cursor instance in which the query will be executed
+        :param query: Query instance
+        :return: Impala Query ID
+        """
+        last_operation = getattr(cursor, "_last_operation", None)
+        if not last_operation:
+            return None
+        guid = last_operation.handle.operationId.guid[::-1].hex()
+        return f"{guid[-16:]}:{guid[:16]}"
+
+    @classmethod
+    def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> 
bool:
+        """
+        Cancel query in the underlying database.
+
+        :param cursor: New cursor instance to the db of the query
+        :param query: Query instance
+        :param cancel_query_id: impala db not need
+        :return: True if query cancelled successfully, False otherwise
+        """
+        try:
+            impala_host = query.database.url_object.host
+            url = 
f"http://{impala_host}:25000/cancel_query?query_id={cancel_query_id}";
+            response = requests.post(url, timeout=3)
+        except Exception:  # pylint: disable=broad-except
+            return False
+
+        return bool(response and response.status_code == 200)
diff --git a/tests/unit_tests/db_engine_specs/test_impala.py 
b/tests/unit_tests/db_engine_specs/test_impala.py
index efaed81cba..543db24368 100644
--- a/tests/unit_tests/db_engine_specs/test_impala.py
+++ b/tests/unit_tests/db_engine_specs/test_impala.py
@@ -17,9 +17,13 @@
 
 from datetime import datetime
 from typing import Optional
+from unittest.mock import Mock, patch
 
 import pytest
 
+from superset.db_engine_specs.impala import ImpalaEngineSpec as spec
+from superset.models.core import Database
+from superset.models.sql_lab import Query
 from tests.unit_tests.db_engine_specs.utils import assert_convert_dttm
 from tests.unit_tests.fixtures.common import dttm  # noqa: F401
 
@@ -37,6 +41,77 @@ def test_convert_dttm(
     expected_result: Optional[str],
     dttm: datetime,  # noqa: F811
 ) -> None:
-    from superset.db_engine_specs.impala import ImpalaEngineSpec as spec
-
     assert_convert_dttm(spec, target_type, expected_result, dttm)
+
+
+def test_get_cancel_query_id() -> None:
+    query = Query()
+
+    cursor_mock = Mock()
+    last_operation_mock = Mock()
+    cursor_mock._last_operation = last_operation_mock
+
+    guid = bytes(reversed(bytes.fromhex("9fbdba20000000006940643a2731718b")))
+    last_operation_mock.handle.operationId.guid = guid
+
+    assert (
+        spec.get_cancel_query_id(cursor_mock, query)
+        == "6940643a2731718b:9fbdba2000000000"
+    )
+
+
+@patch("requests.post")
+def test_cancel_query(post_mock: Mock) -> None:
+    query = Query()
+    database = Database(
+        database_name="test_impala", 
sqlalchemy_uri="impala://localhost:21050/default"
+    )
+    query.database = database
+
+    response_mock = Mock()
+    response_mock.status_code = 200
+    post_mock.return_value = response_mock
+
+    result = spec.cancel_query(None, query, 
"6940643a2731718b:9fbdba2000000000")
+
+    post_mock.assert_called_once_with(
+        
"http://localhost:25000/cancel_query?query_id=6940643a2731718b:9fbdba2000000000";,
+        timeout=3,
+    )
+    assert result is True
+
+
+@patch("requests.post")
+def test_cancel_query_failed(post_mock: Mock) -> None:
+    query = Query()
+    database = Database(
+        database_name="test_impala", 
sqlalchemy_uri="impala://localhost:21050/default"
+    )
+    query.database = database
+
+    response_mock = Mock()
+    response_mock.status_code = 500
+    post_mock.return_value = response_mock
+
+    result = spec.cancel_query(None, query, 
"6940643a2731718b:9fbdba2000000000")
+
+    post_mock.assert_called_once_with(
+        
"http://localhost:25000/cancel_query?query_id=6940643a2731718b:9fbdba2000000000";,
+        timeout=3,
+    )
+    assert result is False
+
+
+@patch("requests.post")
+def test_cancel_query_exception(post_mock: Mock) -> None:
+    query = Query()
+    database = Database(
+        database_name="test_impala", 
sqlalchemy_uri="impala://localhost:21050/default"
+    )
+    query.database = database
+
+    post_mock.side_effect = Exception("Network error")
+
+    result = spec.cancel_query(None, query, 
"6940643a2731718b:9fbdba2000000000")
+
+    assert result is False

Reply via email to