This is an automated email from the ASF dual-hosted git repository.

justinpark pushed a commit to branch 5.0-extensions
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/5.0-extensions by this push:
     new 3a6bd39dd7 fix(trino): update query progress using cursor stats 
(#36872)
3a6bd39dd7 is described below

commit 3a6bd39dd76206b72f258ca158f3621078034351
Author: justinpark <[email protected]>
AuthorDate: Mon Jan 5 14:02:36 2026 -0800

    fix(trino): update query progress using cursor stats (#36872)
    
    (cherry picked from commit 12a266fd2f06c7f4f548307dd0e944068413f285)
---
 superset/db_engine_specs/trino.py              |  61 ++++-
 tests/unit_tests/db_engine_specs/test_trino.py | 309 ++++++++++++++++++++++++-
 2 files changed, 358 insertions(+), 12 deletions(-)

diff --git a/superset/db_engine_specs/trino.py 
b/superset/db_engine_specs/trino.py
index beb7c0a604..68d1856e18 100644
--- a/superset/db_engine_specs/trino.py
+++ b/superset/db_engine_specs/trino.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import contextlib
 import logging
+import math
 import threading
 import time
 from typing import Any, TYPE_CHECKING
@@ -29,6 +30,7 @@ from sqlalchemy.engine.url import URL
 from sqlalchemy.exc import NoSuchTableError
 
 from superset import db
+from superset.common.db_query_status import QueryStatus
 from superset.constants import QUERY_CANCEL_KEY, QUERY_EARLY_CANCEL_KEY, 
USER_AGENT
 from superset.databases.utils import make_url_safe
 from superset.db_engine_specs.base import BaseEngineSpec, 
convert_inspector_columns
@@ -205,6 +207,9 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
         `execute_with_cursor` instead, to handle this asynchronously.
         """
 
+        execute_result = getattr(cursor, "_execute_result", None)
+        execute_event = getattr(cursor, "_execute_event", None)
+
         # Adds the executed query id to the extra payload so the query can be 
cancelled
         cancel_query_id = cursor.query_id
         logger.debug("Query %d: queryId %s found in cursor", query.id, 
cancel_query_id)
@@ -215,17 +220,51 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
 
         db.session.commit()  # pylint: disable=consider-using-transaction
 
-        # if query cancelation was requested prior to the handle_cursor call, 
but
-        # the query was still executed, trigger the actual query cancelation 
now
-        if query.extra.get(QUERY_EARLY_CANCEL_KEY):
-            cls.cancel_query(
-                cursor=cursor,
-                query=query,
-                cancel_query_id=cancel_query_id,
-            )
-
         super().handle_cursor(cursor=cursor, query=query)
 
+        terminal_states = {"FINISHED", "FAILED", "CANCELED"}
+        state = "QUEUED"
+        progress = 0.0
+        poll_interval = 
current_app.config["DB_POLL_INTERVAL_SECONDS"].get(cls.engine, 1)
+        max_wait_time = current_app.config.get("SQLLAB_ASYNC_TIME_LIMIT_SEC", 
21600)
+        start_time = time.time()
+        while state not in terminal_states:
+            if time.time() - start_time > max_wait_time:
+                logger.warning("Query %d: Progress polling timed out", 
query.id)
+                break
+            # Check for errors raised in execute_thread
+            if execute_result is not None and execute_result.get("error"):
+                break
+
+            # Check if execute_event is set (thread completed)
+            if execute_event is not None and execute_event.is_set():
+                break
+
+            # if query cancelation was requested prior to the handle_cursor 
call, but
+            # the query was still executed, trigger the actual query 
cancelation now
+            if query.extra.get(QUERY_EARLY_CANCEL_KEY) or query.status in [
+                QueryStatus.STOPPED,
+                QueryStatus.TIMED_OUT,
+            ]:
+                cls.cancel_query(
+                    cursor=cursor,
+                    query=query,
+                    cancel_query_id=cancel_query_id,
+                )
+                break
+
+            info = getattr(cursor, "stats", {}) or {}
+            state = info.get("state", "UNKNOWN")
+            completed_splits = float(info.get("completedSplits", 0))
+            total_splits = float(info.get("totalSplits", 1) or 1)
+            progress = math.floor((completed_splits / (total_splits or 1)) * 
100)
+
+            if progress != query.progress:
+                query.progress = progress
+                db.session.commit()  # pylint: 
disable=consider-using-transaction
+
+            time.sleep(poll_interval)
+
     @classmethod
     def execute_with_cursor(
         cls,
@@ -290,6 +329,10 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
         while not cursor.query_id and not execute_event.is_set():
             time.sleep(0.1)
 
+        # Pass additional attributes to check whether an error occurred in the
+        # execute thread running in parallel while updating progress through 
the cursor.
+        cursor._execute_result = execute_result
+        cursor._execute_event = execute_event
         logger.debug("Query %d: Handling cursor", query_id)
         cls.handle_cursor(cursor, query)
 
diff --git a/tests/unit_tests/db_engine_specs/test_trino.py 
b/tests/unit_tests/db_engine_specs/test_trino.py
index cbf3bdd5de..04703dd567 100644
--- a/tests/unit_tests/db_engine_specs/test_trino.py
+++ b/tests/unit_tests/db_engine_specs/test_trino.py
@@ -383,21 +383,30 @@ def test_prepare_cancel_query(
 
 
 @pytest.mark.parametrize("cancel_early", [True, False])
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
 @patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
-@patch("sqlalchemy.engine.Engine.connect")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
 def test_handle_cursor_early_cancel(
-    engine_mock: Mock,
+    mock_app: Mock,
+    mock_db: Mock,
     cancel_query_mock: Mock,
+    mock_presto_handle_cursor: Mock,
     cancel_early: bool,
     mocker: MockerFixture,
 ) -> None:
     from superset.db_engine_specs.trino import TrinoEngineSpec
     from superset.models.sql_lab import Query
 
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
     query_id = "myQueryId"
 
-    cursor_mock = engine_mock.return_value.__enter__.return_value
+    # Use spec to prevent MagicMock from creating attributes automatically
+    cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
     cursor_mock.query_id = query_id
+    # Set stats to FINISHED so the progress loop exits immediately
+    cursor_mock.stats = {"state": "FINISHED", "completedSplits": 0, 
"totalSplits": 0}
 
     query = Query()
 
@@ -910,3 +919,297 @@ def test_timegrain_expressions(time_grain: str, 
expected_result: str) -> None:
         spec.get_timestamp_expr(col=column("col"), pdf=None, 
time_grain=time_grain)
     )
     assert actual == expected_result
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_progress_updates(
+    mock_app: Mock,
+    mock_db: Mock,
+    mock_cancel_query: Mock,
+    mock_presto_handle_cursor: Mock,
+    mocker: MockerFixture,
+) -> None:
+    """Test that handle_cursor updates query progress based on cursor stats."""
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+    from superset.models.sql_lab import Query
+
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+    cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+    cursor_mock.query_id = "test-query-id"
+
+    # Simulate progress: 0/10 -> 5/10 -> 10/10 (FINISHED)
+    cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0, 
"totalSplits": 10}
+
+    call_count = 0
+
+    def update_stats(*args, **kwargs):
+        nonlocal call_count
+        call_count += 1
+        if call_count == 1:
+            cursor_mock.stats = {
+                "state": "RUNNING",
+                "completedSplits": 5,
+                "totalSplits": 10,
+            }
+        elif call_count >= 2:
+            cursor_mock.stats = {
+                "state": "FINISHED",
+                "completedSplits": 10,
+                "totalSplits": 10,
+            }
+
+    with patch("superset.db_engine_specs.trino.time.sleep", 
side_effect=update_stats):
+        query = Query()
+        query.status = "running"
+        TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+    assert query.progress == 100.0
+    assert mock_db.session.commit.called
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_cancels_on_stopped_status(
+    mock_app: Mock,
+    mock_db: Mock,
+    mock_cancel_query: Mock,
+    mock_presto_handle_cursor: Mock,
+    mocker: MockerFixture,
+) -> None:
+    """Test that handle_cursor cancels query when status is STOPPED."""
+    from superset.common.db_query_status import QueryStatus
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+    from superset.models.sql_lab import Query
+
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+    cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+    cursor_mock.query_id = "test-query-id"
+    cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0, 
"totalSplits": 10}
+
+    query = Query()
+    query.status = QueryStatus.STOPPED
+
+    with patch("superset.db_engine_specs.trino.time.sleep"):
+        TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+    mock_cancel_query.assert_called_once_with(
+        cursor=cursor_mock,
+        query=query,
+        cancel_query_id="test-query-id",
+    )
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_cancels_on_timed_out_status(
+    mock_app: Mock,
+    mock_db: Mock,
+    mock_cancel_query: Mock,
+    mock_presto_handle_cursor: Mock,
+    mocker: MockerFixture,
+) -> None:
+    """Test that handle_cursor cancels query when status is TIMED_OUT."""
+    from superset.common.db_query_status import QueryStatus
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+    from superset.models.sql_lab import Query
+
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+    cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+    cursor_mock.query_id = "test-query-id"
+    cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0, 
"totalSplits": 10}
+
+    query = Query()
+    query.status = QueryStatus.TIMED_OUT
+
+    with patch("superset.db_engine_specs.trino.time.sleep"):
+        TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+    mock_cancel_query.assert_called_once_with(
+        cursor=cursor_mock,
+        query=query,
+        cancel_query_id="test-query-id",
+    )
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_breaks_on_execute_error(
+    mock_app: Mock,
+    mock_db: Mock,
+    mock_cancel_query: Mock,
+    mock_presto_handle_cursor: Mock,
+    mocker: MockerFixture,
+) -> None:
+    """Test that handle_cursor breaks the loop when execute_result has an 
error."""
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+    from superset.models.sql_lab import Query
+
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+    cursor_mock = mocker.MagicMock(
+        spec=["query_id", "stats", "info_uri", "_execute_result", 
"_execute_event"]
+    )
+    cursor_mock.query_id = "test-query-id"
+    cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0, 
"totalSplits": 10}
+    cursor_mock._execute_result = {"error": Exception("Test error")}
+    cursor_mock._execute_event = None
+
+    query = Query()
+    query.status = "running"
+
+    # Should break immediately due to error in execute_result
+    TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+    # cancel_query should not be called since we broke due to error, not status
+    mock_cancel_query.assert_not_called()
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_breaks_on_execute_event_set(
+    mock_app: Mock,
+    mock_db: Mock,
+    mock_cancel_query: Mock,
+    mock_presto_handle_cursor: Mock,
+    mocker: MockerFixture,
+) -> None:
+    """Test that handle_cursor breaks the loop when execute_event is set."""
+    import threading
+
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+    from superset.models.sql_lab import Query
+
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+    cursor_mock = mocker.MagicMock(
+        spec=["query_id", "stats", "info_uri", "_execute_result", 
"_execute_event"]
+    )
+    cursor_mock.query_id = "test-query-id"
+    cursor_mock.stats = {"state": "RUNNING", "completedSplits": 5, 
"totalSplits": 10}
+
+    execute_event = threading.Event()
+    execute_event.set()  # Simulate thread completion
+
+    cursor_mock._execute_result = {}
+    cursor_mock._execute_event = execute_event
+
+    query = Query()
+    query.status = "running"
+
+    # Should break immediately since execute_event is set
+    TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+    # cancel_query should not be called since we broke due to event being set
+    mock_cancel_query.assert_not_called()
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_handles_zero_total_splits(
+    mock_app: Mock,
+    mock_db: Mock,
+    mock_cancel_query: Mock,
+    mock_presto_handle_cursor: Mock,
+    mocker: MockerFixture,
+) -> None:
+    """Test that handle_cursor handles zero totalSplits without division 
error."""
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+    from superset.models.sql_lab import Query
+
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+    cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+    cursor_mock.query_id = "test-query-id"
+
+    call_count = 0
+
+    def update_stats(*args, **kwargs):
+        nonlocal call_count
+        call_count += 1
+        if call_count >= 1:
+            cursor_mock.stats = {
+                "state": "FINISHED",
+                "completedSplits": 0,
+                "totalSplits": 0,
+            }
+
+    # Start with zero splits
+    cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0, 
"totalSplits": 0}
+
+    with patch("superset.db_engine_specs.trino.time.sleep", 
side_effect=update_stats):
+        query = Query()
+        query.status = "running"
+        # Should not raise ZeroDivisionError
+        TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+    # Progress should be 0/1 = 0 when totalSplits is 0
+    assert query.progress == 0.0
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_only_commits_on_progress_change(
+    mock_app: Mock,
+    mock_db: Mock,
+    mock_cancel_query: Mock,
+    mock_presto_handle_cursor: Mock,
+    mocker: MockerFixture,
+) -> None:
+    """Test that handle_cursor only commits when progress changes."""
+    from superset.db_engine_specs.trino import TrinoEngineSpec
+    from superset.models.sql_lab import Query
+
+    mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+    cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+    cursor_mock.query_id = "test-query-id"
+
+    call_count = 0
+
+    def update_stats(*args, **kwargs):
+        nonlocal call_count
+        call_count += 1
+        # Keep same progress for first two iterations, then finish
+        if call_count < 3:
+            cursor_mock.stats = {
+                "state": "RUNNING",
+                "completedSplits": 5,
+                "totalSplits": 10,
+            }
+        else:
+            cursor_mock.stats = {
+                "state": "FINISHED",
+                "completedSplits": 10,
+                "totalSplits": 10,
+            }
+
+    cursor_mock.stats = {"state": "RUNNING", "completedSplits": 5, 
"totalSplits": 10}
+
+    with patch("superset.db_engine_specs.trino.time.sleep", 
side_effect=update_stats):
+        query = Query()
+        query.status = "running"
+        TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+    # Initial commit from set_extra_json_key, then commits only when progress 
changes
+    # Progress changes: None->0.5, 0.5->0.5 (no commit), 0.5->1.0
+    # So we expect: 1 (initial) + 1 (0.5) + 1 (1.0) = 3 commits total
+    commit_calls = mock_db.session.commit.call_count
+    assert commit_calls >= 2  # At least initial commit and one progress update

Reply via email to