This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f969e6374daa8469938169be16a28f7c073a5ce9
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Oct 23 16:10:51 2025 +0100

    Fix connection access in triggerer for deferrable operators (#57154)
    
    When deferrable operators run in the triggerer's async event loop and
    synchronously access connections (e.g., via @cached_property), the
    `ExecutionAPISecretsBackend` failed silently. This occurred because
    `SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError`
    when called within an existing event loop in a greenback portal context.
    
    Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that
    detects this scenario and uses `greenback.await_()` to call the async
    versions (aget_connection/aget_variable) as a fallback.
    
    It was originally fixed in https://github.com/apache/airflow/pull/55799 for 
3.1.0
    but https://github.com/apache/airflow/pull/56602 introduced a bug.
    
    Ideally all providers handle this better and have better written Triggers. 
Example
    PR for Databricks: https://github.com/apache/airflow/pull/55568
    
    Fixes https://github.com/apache/airflow/issues/57145
    
    (cherry picked from commit da32b682d1b0df5d5e2078392cf8626f8fdb00ff)
---
 task-sdk/src/airflow/sdk/definitions/connection.py | 19 -----------
 .../sdk/execution_time/secrets/execution_api.py    | 20 ++++++++++++
 .../tests/task_sdk/execution_time/test_secrets.py  | 38 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 19 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py 
b/task-sdk/src/airflow/sdk/definitions/connection.py
index b1fb1190bc3..27888f5bf08 100644
--- a/task-sdk/src/airflow/sdk/definitions/connection.py
+++ b/task-sdk/src/airflow/sdk/definitions/connection.py
@@ -17,7 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import asyncio
 import json
 import logging
 from json import JSONDecodeError
@@ -226,24 +225,6 @@ class Connection:
             return _get_connection(conn_id)
         except AirflowRuntimeError as e:
             cls._handle_connection_error(e, conn_id)
-        except RuntimeError as e:
-            # The error from async_to_sync is a RuntimeError, so we have to 
fall back to text matching
-            if str(e).startswith("You cannot use AsyncToSync in the same 
thread as an async event loop"):
-                import greenback
-
-                task = asyncio.current_task()
-                if greenback.has_portal(task):
-                    import warnings
-
-                    warnings.warn(
-                        "You should not use sync calls here -- use `await 
Conn.async_get` instead",
-                        stacklevel=2,
-                    )
-
-                    return greenback.await_(cls.async_get(conn_id))
-
-            log.exception("async_to_sync failed")
-            raise
 
     @classmethod
     async def async_get(cls, conn_id: str) -> Any:
diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py 
b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
index 8f32282c2bc..3dfe691c263 100644
--- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
@@ -63,6 +63,26 @@ class ExecutionAPISecretsBackend(BaseSecretsBackend):
 
             # Convert ExecutionAPI response to SDK Connection
             return _process_connection_result_conn(msg)
+        except RuntimeError as e:
+            # TriggerCommsDecoder.send() uses async_to_sync internally, which 
raises RuntimeError
+            # when called within an async event loop. In greenback portal 
contexts (triggerer),
+            # we catch this and use greenback to call the async version 
instead.
+            if str(e).startswith("You cannot use AsyncToSync in the same 
thread as an async event loop"):
+                import asyncio
+
+                import greenback
+
+                task = asyncio.current_task()
+                if greenback.has_portal(task):
+                    import warnings
+
+                    warnings.warn(
+                        "You should not use sync calls here -- use `await 
aget_connection` instead",
+                        stacklevel=2,
+                    )
+                    return greenback.await_(self.aget_connection(conn_id))
+            # Fall through to the general exception handler for other 
RuntimeErrors
+            return None
         except Exception:
             # If SUPERVISOR_COMMS fails for any reason, return None
             # to allow fallback to other backends
diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py 
b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
index bda87a6a64a..b19ab3b1003 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import pytest
 
+from airflow.sdk.definitions.connection import Connection
 from airflow.sdk.execution_time.secrets.execution_api import 
ExecutionAPISecretsBackend
 
 
@@ -120,6 +121,43 @@ class TestExecutionAPISecretsBackend:
         with pytest.raises(NotImplementedError, match="Use get_connection 
instead"):
             backend.get_conn_value("test_conn")
 
+    def test_runtime_error_triggers_greenback_fallback(self, mocker, 
mock_supervisor_comms):
+        """
+        Test that RuntimeError from async_to_sync triggers greenback fallback.
+
+        This test verifies the fix for issue #57145: when 
SUPERVISOR_COMMS.send()
+        raises the specific RuntimeError about async_to_sync in an event loop,
+        the backend catches it and uses greenback to call aget_connection().
+        """
+
+        # Expected connection to be returned
+        expected_conn = Connection(
+            conn_id="databricks_default",
+            conn_type="databricks",
+            host="example.databricks.com",
+        )
+
+        # Simulate the RuntimeError that triggers greenback fallback
+        mock_supervisor_comms.send.side_effect = RuntimeError(
+            "You cannot use AsyncToSync in the same thread as an async event 
loop"
+        )
+
+        # Mock the greenback and asyncio modules that are imported inside the 
exception handler
+        mocker.patch("greenback.has_portal", return_value=True)
+        mock_greenback_await = mocker.patch("greenback.await_", 
return_value=expected_conn)
+        mocker.patch("asyncio.current_task")
+
+        backend = ExecutionAPISecretsBackend()
+        conn = backend.get_connection("databricks_default")
+
+        # Verify we got the expected connection
+        assert conn is not None
+        assert conn.conn_id == "databricks_default"
+        # Verify the greenback fallback was called
+        mock_greenback_await.assert_called_once()
+        # Verify send was attempted first (and raised RuntimeError)
+        mock_supervisor_comms.send.assert_called_once()
+
 
 class TestContextDetection:
     """Test context detection in ensure_secrets_backend_loaded."""

Reply via email to