This is an automated email from the ASF dual-hosted git repository.
justinpark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 12a266fd2f fix(trino): update query progress using cursor stats
(#36872)
12a266fd2f is described below
commit 12a266fd2f06c7f4f548307dd0e944068413f285
Author: JUST.in DO IT <[email protected]>
AuthorDate: Mon Jan 5 13:19:20 2026 -0800
fix(trino): update query progress using cursor stats (#36872)
---
superset/db_engine_specs/trino.py | 61 ++++-
tests/unit_tests/db_engine_specs/test_trino.py | 309 ++++++++++++++++++++++++-
2 files changed, 358 insertions(+), 12 deletions(-)
diff --git a/superset/db_engine_specs/trino.py
b/superset/db_engine_specs/trino.py
index e5d0789ff2..4792ea7292 100644
--- a/superset/db_engine_specs/trino.py
+++ b/superset/db_engine_specs/trino.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import contextlib
import logging
+import math
import threading
import time
from typing import Any, TYPE_CHECKING
@@ -29,6 +30,7 @@ from sqlalchemy.engine.url import URL
from sqlalchemy.exc import NoSuchTableError
from superset import db
+from superset.common.db_query_status import QueryStatus
from superset.constants import QUERY_CANCEL_KEY, QUERY_EARLY_CANCEL_KEY
from superset.db_engine_specs.base import BaseEngineSpec,
convert_inspector_columns
from superset.db_engine_specs.exceptions import (
@@ -177,6 +179,9 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
`execute_with_cursor` instead, to handle this asynchronously.
"""
+ execute_result = getattr(cursor, "_execute_result", None)
+ execute_event = getattr(cursor, "_execute_event", None)
+
# Adds the executed query id to the extra payload so the query can be
cancelled
cancel_query_id = cursor.query_id
logger.debug("Query %d: queryId %s found in cursor", query.id,
cancel_query_id)
@@ -187,17 +192,51 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
db.session.commit() # pylint: disable=consider-using-transaction
- # if query cancelation was requested prior to the handle_cursor call,
but
- # the query was still executed, trigger the actual query cancelation
now
- if query.extra.get(QUERY_EARLY_CANCEL_KEY):
- cls.cancel_query(
- cursor=cursor,
- query=query,
- cancel_query_id=cancel_query_id,
- )
-
super().handle_cursor(cursor=cursor, query=query)
+ terminal_states = {"FINISHED", "FAILED", "CANCELED"}
+ state = "QUEUED"
+ progress = 0.0
+ poll_interval = app.config["DB_POLL_INTERVAL_SECONDS"].get(cls.engine,
1)
+ max_wait_time = app.config.get("SQLLAB_ASYNC_TIME_LIMIT_SEC", 21600)
+ start_time = time.time()
+ while state not in terminal_states:
+ if time.time() - start_time > max_wait_time:
+ logger.warning("Query %d: Progress polling timed out",
query.id)
+ break
+ # Check for errors raised in execute_thread
+ if execute_result is not None and execute_result.get("error"):
+ break
+
+ # Check if execute_event is set (thread completed)
+ if execute_event is not None and execute_event.is_set():
+ break
+
+ # if query cancelation was requested prior to the handle_cursor
call, but
+ # the query was still executed, trigger the actual query
cancelation now
+ if query.extra.get(QUERY_EARLY_CANCEL_KEY) or query.status in [
+ QueryStatus.STOPPED,
+ QueryStatus.TIMED_OUT,
+ ]:
+ cls.cancel_query(
+ cursor=cursor,
+ query=query,
+ cancel_query_id=cancel_query_id,
+ )
+ break
+
+ info = getattr(cursor, "stats", {}) or {}
+ state = info.get("state", "UNKNOWN")
+ completed_splits = float(info.get("completedSplits", 0))
+ total_splits = float(info.get("totalSplits", 1) or 1)
+ progress = math.floor((completed_splits / (total_splits or 1)) *
100)
+
+ if progress != query.progress:
+ query.progress = progress
+ db.session.commit() # pylint:
disable=consider-using-transaction
+
+ time.sleep(poll_interval)
+
@classmethod
def execute_with_cursor(
cls,
@@ -262,6 +301,10 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
while not cursor.query_id and not execute_event.is_set():
time.sleep(0.1)
+ # Pass additional attributes to check whether an error occurred in the
+ # execute thread running in parallel while updating progress through
the cursor.
+ cursor._execute_result = execute_result
+ cursor._execute_event = execute_event
logger.debug("Query %d: Handling cursor", query_id)
cls.handle_cursor(cursor, query)
diff --git a/tests/unit_tests/db_engine_specs/test_trino.py
b/tests/unit_tests/db_engine_specs/test_trino.py
index af7b59030e..1942b3310e 100644
--- a/tests/unit_tests/db_engine_specs/test_trino.py
+++ b/tests/unit_tests/db_engine_specs/test_trino.py
@@ -384,21 +384,30 @@ def test_prepare_cancel_query(
@pytest.mark.parametrize("cancel_early", [True, False])
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
-@patch("sqlalchemy.engine.Engine.connect")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
def test_handle_cursor_early_cancel(
- engine_mock: Mock,
+ mock_app: Mock,
+ mock_db: Mock,
cancel_query_mock: Mock,
+ mock_presto_handle_cursor: Mock,
cancel_early: bool,
mocker: MockerFixture,
) -> None:
from superset.db_engine_specs.trino import TrinoEngineSpec
from superset.models.sql_lab import Query
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
query_id = "myQueryId"
- cursor_mock = engine_mock.return_value.__enter__.return_value
+ # Use spec to prevent MagicMock from creating attributes automatically
+ cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
cursor_mock.query_id = query_id
+ # Set stats to FINISHED so the progress loop exits immediately
+ cursor_mock.stats = {"state": "FINISHED", "completedSplits": 0,
"totalSplits": 0}
query = Query()
@@ -911,3 +920,297 @@ def test_timegrain_expressions(time_grain: str,
expected_result: str) -> None:
spec.get_timestamp_expr(col=column("col"), pdf=None,
time_grain=time_grain)
)
assert actual == expected_result
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_progress_updates(
+ mock_app: Mock,
+ mock_db: Mock,
+ mock_cancel_query: Mock,
+ mock_presto_handle_cursor: Mock,
+ mocker: MockerFixture,
+) -> None:
+ """Test that handle_cursor updates query progress based on cursor stats."""
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+ cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+ cursor_mock.query_id = "test-query-id"
+
+ # Simulate progress: 0/10 -> 5/10 -> 10/10 (FINISHED)
+ cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0,
"totalSplits": 10}
+
+ call_count = 0
+
+ def update_stats(*args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ if call_count == 1:
+ cursor_mock.stats = {
+ "state": "RUNNING",
+ "completedSplits": 5,
+ "totalSplits": 10,
+ }
+ elif call_count >= 2:
+ cursor_mock.stats = {
+ "state": "FINISHED",
+ "completedSplits": 10,
+ "totalSplits": 10,
+ }
+
+ with patch("superset.db_engine_specs.trino.time.sleep",
side_effect=update_stats):
+ query = Query()
+ query.status = "running"
+ TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+ assert query.progress == 100.0
+ assert mock_db.session.commit.called
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_cancels_on_stopped_status(
+ mock_app: Mock,
+ mock_db: Mock,
+ mock_cancel_query: Mock,
+ mock_presto_handle_cursor: Mock,
+ mocker: MockerFixture,
+) -> None:
+ """Test that handle_cursor cancels query when status is STOPPED."""
+ from superset.common.db_query_status import QueryStatus
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+ cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+ cursor_mock.query_id = "test-query-id"
+ cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0,
"totalSplits": 10}
+
+ query = Query()
+ query.status = QueryStatus.STOPPED
+
+ with patch("superset.db_engine_specs.trino.time.sleep"):
+ TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+ mock_cancel_query.assert_called_once_with(
+ cursor=cursor_mock,
+ query=query,
+ cancel_query_id="test-query-id",
+ )
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_cancels_on_timed_out_status(
+ mock_app: Mock,
+ mock_db: Mock,
+ mock_cancel_query: Mock,
+ mock_presto_handle_cursor: Mock,
+ mocker: MockerFixture,
+) -> None:
+ """Test that handle_cursor cancels query when status is TIMED_OUT."""
+ from superset.common.db_query_status import QueryStatus
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+ cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+ cursor_mock.query_id = "test-query-id"
+ cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0,
"totalSplits": 10}
+
+ query = Query()
+ query.status = QueryStatus.TIMED_OUT
+
+ with patch("superset.db_engine_specs.trino.time.sleep"):
+ TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+ mock_cancel_query.assert_called_once_with(
+ cursor=cursor_mock,
+ query=query,
+ cancel_query_id="test-query-id",
+ )
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_breaks_on_execute_error(
+ mock_app: Mock,
+ mock_db: Mock,
+ mock_cancel_query: Mock,
+ mock_presto_handle_cursor: Mock,
+ mocker: MockerFixture,
+) -> None:
+ """Test that handle_cursor breaks the loop when execute_result has an
error."""
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+ cursor_mock = mocker.MagicMock(
+ spec=["query_id", "stats", "info_uri", "_execute_result",
"_execute_event"]
+ )
+ cursor_mock.query_id = "test-query-id"
+ cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0,
"totalSplits": 10}
+ cursor_mock._execute_result = {"error": Exception("Test error")}
+ cursor_mock._execute_event = None
+
+ query = Query()
+ query.status = "running"
+
+ # Should break immediately due to error in execute_result
+ TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+ # cancel_query should not be called since we broke due to error, not status
+ mock_cancel_query.assert_not_called()
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_breaks_on_execute_event_set(
+ mock_app: Mock,
+ mock_db: Mock,
+ mock_cancel_query: Mock,
+ mock_presto_handle_cursor: Mock,
+ mocker: MockerFixture,
+) -> None:
+ """Test that handle_cursor breaks the loop when execute_event is set."""
+ import threading
+
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+ cursor_mock = mocker.MagicMock(
+ spec=["query_id", "stats", "info_uri", "_execute_result",
"_execute_event"]
+ )
+ cursor_mock.query_id = "test-query-id"
+ cursor_mock.stats = {"state": "RUNNING", "completedSplits": 5,
"totalSplits": 10}
+
+ execute_event = threading.Event()
+ execute_event.set() # Simulate thread completion
+
+ cursor_mock._execute_result = {}
+ cursor_mock._execute_event = execute_event
+
+ query = Query()
+ query.status = "running"
+
+ # Should break immediately since execute_event is set
+ TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+ # cancel_query should not be called since we broke due to event being set
+ mock_cancel_query.assert_not_called()
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_handles_zero_total_splits(
+ mock_app: Mock,
+ mock_db: Mock,
+ mock_cancel_query: Mock,
+ mock_presto_handle_cursor: Mock,
+ mocker: MockerFixture,
+) -> None:
+ """Test that handle_cursor handles zero totalSplits without division
error."""
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+ cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+ cursor_mock.query_id = "test-query-id"
+
+ call_count = 0
+
+ def update_stats(*args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ if call_count >= 1:
+ cursor_mock.stats = {
+ "state": "FINISHED",
+ "completedSplits": 0,
+ "totalSplits": 0,
+ }
+
+ # Start with zero splits
+ cursor_mock.stats = {"state": "RUNNING", "completedSplits": 0,
"totalSplits": 0}
+
+ with patch("superset.db_engine_specs.trino.time.sleep",
side_effect=update_stats):
+ query = Query()
+ query.status = "running"
+ # Should not raise ZeroDivisionError
+ TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+ # Progress should be 0/1 = 0 when totalSplits is 0
+ assert query.progress == 0.0
+
+
+@patch("superset.db_engine_specs.presto.PrestoBaseEngineSpec.handle_cursor")
+@patch("superset.db_engine_specs.trino.TrinoEngineSpec.cancel_query")
+@patch("superset.db_engine_specs.trino.db")
+@patch("superset.db_engine_specs.trino.app")
+def test_handle_cursor_only_commits_on_progress_change(
+ mock_app: Mock,
+ mock_db: Mock,
+ mock_cancel_query: Mock,
+ mock_presto_handle_cursor: Mock,
+ mocker: MockerFixture,
+) -> None:
+ """Test that handle_cursor only commits when progress changes."""
+ from superset.db_engine_specs.trino import TrinoEngineSpec
+ from superset.models.sql_lab import Query
+
+ mock_app.config = {"DB_POLL_INTERVAL_SECONDS": {"trino": 0}}
+
+ cursor_mock = mocker.MagicMock(spec=["query_id", "stats", "info_uri"])
+ cursor_mock.query_id = "test-query-id"
+
+ call_count = 0
+
+ def update_stats(*args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ # Keep same progress for first two iterations, then finish
+ if call_count < 3:
+ cursor_mock.stats = {
+ "state": "RUNNING",
+ "completedSplits": 5,
+ "totalSplits": 10,
+ }
+ else:
+ cursor_mock.stats = {
+ "state": "FINISHED",
+ "completedSplits": 10,
+ "totalSplits": 10,
+ }
+
+ cursor_mock.stats = {"state": "RUNNING", "completedSplits": 5,
"totalSplits": 10}
+
+ with patch("superset.db_engine_specs.trino.time.sleep",
side_effect=update_stats):
+ query = Query()
+ query.status = "running"
+ TrinoEngineSpec.handle_cursor(cursor=cursor_mock, query=query)
+
+ # Initial commit from set_extra_json_key, then commits only when progress
changes
+ # Progress changes: None->0.5, 0.5->0.5 (no commit), 0.5->1.0
+ # So we expect: 1 (initial) + 1 (0.5) + 1 (1.0) = 3 commits total
+ commit_calls = mock_db.session.commit.call_count
+ assert commit_calls >= 2 # At least initial commit and one progress update