This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f969e6374daa8469938169be16a28f7c073a5ce9 Author: Kaxil Naik <[email protected]> AuthorDate: Thu Oct 23 16:10:51 2025 +0100 Fix connection access in triggerer for deferrable operators (#57154) When deferrable operators run in the triggerer's async event loop and synchronously access connections (e.g., via @cached_property), the `ExecutionAPISecretsBackend` failed silently. This occurred because `SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError` when called within an existing event loop in a greenback portal context. Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that detects this scenario and uses `greenback.await_()` to call the async versions (aget_connection/aget_variable) as a fallback. It was originally fixed in https://github.com/apache/airflow/pull/55799 for 3.1.0 but https://github.com/apache/airflow/pull/56602 introduced a bug. Ideally all providers handle this better and have better written Triggers. Example PR for Databricks: https://github.com/apache/airflow/pull/55568 Fixes https://github.com/apache/airflow/issues/57145 (cherry picked from commit da32b682d1b0df5d5e2078392cf8626f8fdb00ff) --- task-sdk/src/airflow/sdk/definitions/connection.py | 19 ----------- .../sdk/execution_time/secrets/execution_api.py | 20 ++++++++++++ .../tests/task_sdk/execution_time/test_secrets.py | 38 ++++++++++++++++++++++ 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py b/task-sdk/src/airflow/sdk/definitions/connection.py index b1fb1190bc3..27888f5bf08 100644 --- a/task-sdk/src/airflow/sdk/definitions/connection.py +++ b/task-sdk/src/airflow/sdk/definitions/connection.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import asyncio import json import logging from json import JSONDecodeError @@ -226,24 +225,6 @@ class Connection: return _get_connection(conn_id) except AirflowRuntimeError as e: cls._handle_connection_error(e, conn_id) - except RuntimeError as e: - # The error from async_to_sync is a RuntimeError, so we have to fall back to text matching - if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"): - import greenback - - task = asyncio.current_task() - if greenback.has_portal(task): - import warnings - - warnings.warn( - "You should not use sync calls here -- use `await Conn.async_get` instead", - stacklevel=2, - ) - - return greenback.await_(cls.async_get(conn_id)) - - log.exception("async_to_sync failed") - raise @classmethod async def async_get(cls, conn_id: str) -> Any: diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py index 8f32282c2bc..3dfe691c263 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py @@ -63,6 +63,26 @@ class ExecutionAPISecretsBackend(BaseSecretsBackend): # Convert ExecutionAPI response to SDK Connection return _process_connection_result_conn(msg) + except RuntimeError as e: + # TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError + # when called within an async event loop. In greenback portal contexts (triggerer), + # we catch this and use greenback to call the async version instead. + if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"): + import asyncio + + import greenback + + task = asyncio.current_task() + if greenback.has_portal(task): + import warnings + + warnings.warn( + "You should not use sync calls here -- use `await aget_connection` instead", + stacklevel=2, + ) + return greenback.await_(self.aget_connection(conn_id)) + # Fall through to the general exception handler for other RuntimeErrors + return None except Exception: # If SUPERVISOR_COMMS fails for any reason, return None # to allow fallback to other backends diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py b/task-sdk/tests/task_sdk/execution_time/test_secrets.py index bda87a6a64a..b19ab3b1003 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py +++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py @@ -19,6 +19,7 @@ from __future__ import annotations import pytest +from airflow.sdk.definitions.connection import Connection from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend @@ -120,6 +121,43 @@ class TestExecutionAPISecretsBackend: with pytest.raises(NotImplementedError, match="Use get_connection instead"): backend.get_conn_value("test_conn") + def test_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor_comms): + """ + Test that RuntimeError from async_to_sync triggers greenback fallback. + + This test verifies the fix for issue #57145: when SUPERVISOR_COMMS.send() + raises the specific RuntimeError about async_to_sync in an event loop, + the backend catches it and uses greenback to call aget_connection(). + """ + + # Expected connection to be returned + expected_conn = Connection( + conn_id="databricks_default", + conn_type="databricks", + host="example.databricks.com", + ) + + # Simulate the RuntimeError that triggers greenback fallback + mock_supervisor_comms.send.side_effect = RuntimeError( + "You cannot use AsyncToSync in the same thread as an async event loop" + ) + + # Mock the greenback and asyncio modules that are imported inside the exception handler + mocker.patch("greenback.has_portal", return_value=True) + mock_greenback_await = mocker.patch("greenback.await_", return_value=expected_conn) + mocker.patch("asyncio.current_task") + + backend = ExecutionAPISecretsBackend() + conn = backend.get_connection("databricks_default") + + # Verify we got the expected connection + assert conn is not None + assert conn.conn_id == "databricks_default" + # Verify the greenback fallback was called + mock_greenback_await.assert_called_once() + # Verify send was attempted first (and raised RuntimeError) + mock_supervisor_comms.send.assert_called_once() + class TestContextDetection: """Test context detection in ensure_secrets_backend_loaded."""
