Copilot commented on code in PR #63387:
URL: https://github.com/apache/airflow/pull/63387#discussion_r3066475280


##########
task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py:
##########
@@ -113,6 +113,26 @@ def get_variable(self, key: str, team_name: str | None = 
None) -> str | None:
             if isinstance(msg, VariableResult):
                 return msg.value  # Already a string | None
             return None
+        except RuntimeError as e:
+            # TriggerCommsDecoder.send() uses async_to_sync internally, which 
raises RuntimeError
+            # when called within an async event loop. In greenback portal 
contexts (triggerer),
+            # we catch this and use greenback to call the async version 
instead.
+            if str(e).startswith("You cannot use AsyncToSync in the same 
thread as an async event loop"):
+                import asyncio
+
+                import greenback
+
+                task = asyncio.current_task()
+                if greenback.has_portal(task):
+                    import warnings
+
+                    warnings.warn(
+                        "You should not use sync calls here -- use `await 
aget_variable` instead",
+                        stacklevel=2,
+                    )
+                    return greenback.await_(self.aget_variable(key))
+            # Fall through to the general exception handler for other 
RuntimeErrors
+            return None

Review Comment:
   `import greenback` is executed inside the `except RuntimeError` handler. If 
`greenback` is not installed/available in some client contexts, this will raise 
`ImportError` *while handling the original RuntimeError* and will escape the 
function (i.e., it will no longer return `None` and allow other backends to be 
tried). Wrap the greenback import/usage in a local try/except `ImportError` 
(and return `None` on failure), so the fallback remains best-effort and 
behavior stays consistent with the outer 'return None to allow fallback' intent.



##########
task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py:
##########
@@ -113,6 +113,26 @@ def get_variable(self, key: str, team_name: str | None = 
None) -> str | None:
             if isinstance(msg, VariableResult):
                 return msg.value  # Already a string | None
             return None
+        except RuntimeError as e:
+            # TriggerCommsDecoder.send() uses async_to_sync internally, which 
raises RuntimeError
+            # when called within an async event loop. In greenback portal 
contexts (triggerer),
+            # we catch this and use greenback to call the async version 
instead.
+            if str(e).startswith("You cannot use AsyncToSync in the same 
thread as an async event loop"):
+                import asyncio
+
+                import greenback
+
+                task = asyncio.current_task()
+                if greenback.has_portal(task):
+                    import warnings
+
+                    warnings.warn(
+                        "You should not use sync calls here -- use `await 
aget_variable` instead",
+                        stacklevel=2,
+                    )
+                    return greenback.await_(self.aget_variable(key))
+            # Fall through to the general exception handler for other 
RuntimeErrors

Review Comment:
   The comment says 'Fall through to the general exception handler', but the 
code returns `None` immediately and cannot fall through to the `except 
Exception` block. Update the comment to match the control flow (e.g., 'For 
other RuntimeErrors, return None to allow fallback to other backends') or 
remove it to avoid misleading future readers.
   ```suggestion
               # For other RuntimeErrors, return None to allow fallback to 
other backends.
   ```



##########
task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py:
##########
@@ -113,6 +113,26 @@ def get_variable(self, key: str, team_name: str | None = 
None) -> str | None:
             if isinstance(msg, VariableResult):
                 return msg.value  # Already a string | None
             return None
+        except RuntimeError as e:
+            # TriggerCommsDecoder.send() uses async_to_sync internally, which 
raises RuntimeError
+            # when called within an async event loop. In greenback portal 
contexts (triggerer),
+            # we catch this and use greenback to call the async version 
instead.
+            if str(e).startswith("You cannot use AsyncToSync in the same 
thread as an async event loop"):
+                import asyncio
+
+                import greenback
+
+                task = asyncio.current_task()
+                if greenback.has_portal(task):
+                    import warnings
+
+                    warnings.warn(
+                        "You should not use sync calls here -- use `await 
aget_variable` instead",
+                        stacklevel=2,
+                    )
+                    return greenback.await_(self.aget_variable(key))

Review Comment:
   The greenback portal detection + warning + `await_` fallback logic is now 
duplicated between `get_connection()` and `get_variable()`. Consider extracting 
this into a small private helper (e.g., `_greenback_sync_fallback(kind, coro)` 
or similar) to reduce duplication and keep the warning text / detection logic 
consistent across the two code paths.



##########
task-sdk/tests/task_sdk/execution_time/test_secrets.py:
##########
@@ -178,6 +178,49 @@ async def mock_aget_connection(self, conn_id):
         # Verify send was attempted first (and raised RuntimeError)
         mock_supervisor_comms.send.assert_called_once()
 
+    def test_get_variable_runtime_error_triggers_greenback_fallback(self, 
mocker, mock_supervisor_comms):
+        """
+        Test that RuntimeError from async_to_sync triggers greenback fallback 
for variables.
+
+        Same as the connection test but for get_variable — verifies the fix 
for #61676:
+        triggers calling Variable.get() fail because SUPERVISOR_COMMS.send() 
raises
+        RuntimeError in the async event loop, but the greenback fallback was 
missing.
+        """
+        expected_value = "10"
+
+        # Simulate the RuntimeError that triggers greenback fallback
+        mock_supervisor_comms.send.side_effect = RuntimeError(
+            "You cannot use AsyncToSync in the same thread as an async event 
loop"
+        )
+
+        # Mock the greenback and asyncio modules
+        mocker.patch("greenback.has_portal", return_value=True)
+        mocker.patch("asyncio.current_task")
+
+        import asyncio
+
+        def greenback_await_side_effect(coro):
+            loop = asyncio.new_event_loop()
+            try:
+                return loop.run_until_complete(coro)
+            finally:
+                loop.close()
+
+        mock_greenback_await = mocker.patch("greenback.await_", 
side_effect=greenback_await_side_effect)
+
+        # Mock aget_variable to return the expected value
+        async def mock_aget_variable(self, key):
+            return expected_value
+
+        mocker.patch.object(ExecutionAPISecretsBackend, "aget_variable", 
mock_aget_variable)
+
+        backend = ExecutionAPISecretsBackend()
+        result = backend.get_variable("retries")
+
+        assert result == expected_value
+        mock_greenback_await.assert_called_once()
+        mock_supervisor_comms.send.assert_called_once()

Review Comment:
   This test exercises the greenback fallback, but it doesn’t assert the 
`warnings.warn(...)` behavior introduced by the fallback path. Adding an 
assertion (e.g., using `pytest.warns` and matching the message) would ensure 
the warning remains present and correct, and prevent future regressions where 
the fallback works but the guidance to use `await aget_variable` is 
accidentally removed/changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to