Copilot commented on code in PR #63387:
URL: https://github.com/apache/airflow/pull/63387#discussion_r3066475280
##########
task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py:
##########
@@ -113,6 +113,26 @@ def get_variable(self, key: str, team_name: str | None =
None) -> str | None:
if isinstance(msg, VariableResult):
return msg.value # Already a string | None
return None
+ except RuntimeError as e:
+ # TriggerCommsDecoder.send() uses async_to_sync internally, which
raises RuntimeError
+ # when called within an async event loop. In greenback portal
contexts (triggerer),
+ # we catch this and use greenback to call the async version
instead.
+ if str(e).startswith("You cannot use AsyncToSync in the same
thread as an async event loop"):
+ import asyncio
+
+ import greenback
+
+ task = asyncio.current_task()
+ if greenback.has_portal(task):
+ import warnings
+
+ warnings.warn(
+ "You should not use sync calls here -- use `await
aget_variable` instead",
+ stacklevel=2,
+ )
+ return greenback.await_(self.aget_variable(key))
+ # Fall through to the general exception handler for other
RuntimeErrors
+ return None
Review Comment:
`import greenback` is executed inside the `except RuntimeError` handler. If
`greenback` is not installed/available in some client contexts, this will raise
`ImportError` *while handling the original RuntimeError* and will escape the
function (i.e., it will no longer return `None` and allow other backends to be
tried). Wrap the greenback import/usage in a local try/except `ImportError`
(and return `None` on failure), so the fallback remains best-effort and
behavior stays consistent with the outer 'return None to allow fallback' intent.
##########
task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py:
##########
@@ -113,6 +113,26 @@ def get_variable(self, key: str, team_name: str | None =
None) -> str | None:
if isinstance(msg, VariableResult):
return msg.value # Already a string | None
return None
+ except RuntimeError as e:
+ # TriggerCommsDecoder.send() uses async_to_sync internally, which
raises RuntimeError
+ # when called within an async event loop. In greenback portal
contexts (triggerer),
+ # we catch this and use greenback to call the async version
instead.
+ if str(e).startswith("You cannot use AsyncToSync in the same
thread as an async event loop"):
+ import asyncio
+
+ import greenback
+
+ task = asyncio.current_task()
+ if greenback.has_portal(task):
+ import warnings
+
+ warnings.warn(
+ "You should not use sync calls here -- use `await
aget_variable` instead",
+ stacklevel=2,
+ )
+ return greenback.await_(self.aget_variable(key))
+ # Fall through to the general exception handler for other
RuntimeErrors
Review Comment:
The comment says 'Fall through to the general exception handler', but the
code returns `None` immediately and cannot fall through to the `except
Exception` block. Update the comment to match the control flow (e.g., 'For
other RuntimeErrors, return None to allow fallback to other backends') or
remove it to avoid misleading future readers.
```suggestion
# For other RuntimeErrors, return None to allow fallback to
other backends.
```
##########
task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py:
##########
@@ -113,6 +113,26 @@ def get_variable(self, key: str, team_name: str | None =
None) -> str | None:
if isinstance(msg, VariableResult):
return msg.value # Already a string | None
return None
+ except RuntimeError as e:
+ # TriggerCommsDecoder.send() uses async_to_sync internally, which
raises RuntimeError
+ # when called within an async event loop. In greenback portal
contexts (triggerer),
+ # we catch this and use greenback to call the async version
instead.
+ if str(e).startswith("You cannot use AsyncToSync in the same
thread as an async event loop"):
+ import asyncio
+
+ import greenback
+
+ task = asyncio.current_task()
+ if greenback.has_portal(task):
+ import warnings
+
+ warnings.warn(
+ "You should not use sync calls here -- use `await
aget_variable` instead",
+ stacklevel=2,
+ )
+ return greenback.await_(self.aget_variable(key))
Review Comment:
The greenback portal detection + warning + `await_` fallback logic is now
duplicated between `get_connection()` and `get_variable()`. Consider extracting
this into a small private helper (e.g., `_greenback_sync_fallback(kind, coro)`
or similar) to reduce duplication and keep the warning text / detection logic
consistent across the two code paths.
##########
task-sdk/tests/task_sdk/execution_time/test_secrets.py:
##########
@@ -178,6 +178,49 @@ async def mock_aget_connection(self, conn_id):
# Verify send was attempted first (and raised RuntimeError)
mock_supervisor_comms.send.assert_called_once()
+ def test_get_variable_runtime_error_triggers_greenback_fallback(self,
mocker, mock_supervisor_comms):
+ """
+ Test that RuntimeError from async_to_sync triggers greenback fallback
for variables.
+
+ Same as the connection test but for get_variable — verifies the fix
for #61676:
+ triggers calling Variable.get() fail because SUPERVISOR_COMMS.send()
raises
+ RuntimeError in the async event loop, but the greenback fallback was
missing.
+ """
+ expected_value = "10"
+
+ # Simulate the RuntimeError that triggers greenback fallback
+ mock_supervisor_comms.send.side_effect = RuntimeError(
+ "You cannot use AsyncToSync in the same thread as an async event
loop"
+ )
+
+ # Mock the greenback and asyncio modules
+ mocker.patch("greenback.has_portal", return_value=True)
+ mocker.patch("asyncio.current_task")
+
+ import asyncio
+
+ def greenback_await_side_effect(coro):
+ loop = asyncio.new_event_loop()
+ try:
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
+
+ mock_greenback_await = mocker.patch("greenback.await_",
side_effect=greenback_await_side_effect)
+
+ # Mock aget_variable to return the expected value
+ async def mock_aget_variable(self, key):
+ return expected_value
+
+ mocker.patch.object(ExecutionAPISecretsBackend, "aget_variable",
mock_aget_variable)
+
+ backend = ExecutionAPISecretsBackend()
+ result = backend.get_variable("retries")
+
+ assert result == expected_value
+ mock_greenback_await.assert_called_once()
+ mock_supervisor_comms.send.assert_called_once()
Review Comment:
This test exercises the greenback fallback, but it doesn’t assert the
`warnings.warn(...)` behavior introduced by the fallback path. Adding an
assertion (e.g., using `pytest.warns` and matching the message) would ensure
the warning remains present and correct, and prevent future regressions where
the fallback works but the guidance to use `await aget_variable` is
accidentally removed/changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]