kaxil commented on code in PR #61630:
URL: https://github.com/apache/airflow/pull/61630#discussion_r3295454330


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -291,9 +291,26 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
             )
 
     # If no backend found the variable, raise a not found error (mirrors 
_get_connection)
-    from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
+    from airflow.sdk.execution_time import task_runner
     from airflow.sdk.execution_time.comms import ErrorResponse
 
+    if not hasattr(task_runner, "SUPERVISOR_COMMS"):
+        raise AirflowRuntimeError(
+            ErrorResponse(
+                error=ErrorType.VARIABLE_NOT_FOUND,
+                detail={
+                    "message": (
+                        f"Variable '{key}' not found. Note: SUPERVISOR_COMMS 
is not available, "
+                        "which means this code is running outside a task 
execution context "
+                        "(e.g., at the top level of a DAG file). "
+                        "Consider using environment variables 
(AIRFLOW_VAR_<key>), "
+                        "Jinja templates ({{ var.value.<key> }}), "
+                        "or move the Variable.get() call inside a task 
function."
+                    )
+                },
+            )
+        )
+
     raise AirflowRuntimeError(
         ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": 
f"Variable {key} not found"})
     )

Review Comment:
   Related: `_get_variable_keys` below (line 329) still has the bare `from 
airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS` at the top of 
the function. Since `SUPERVISOR_COMMS` is now a bare annotation in 
`task_runner.py` (`SUPERVISOR_COMMS: CommsDecoder[...]` at line 930), this 
import raises `ImportError` at function entry whenever the function is called 
outside a task context -- exactly the opaque failure mode this PR is trying to 
wrap with a helpful message.
   
   `Variable.keys()` from a DAG top-level (or any non-task caller) will hit the 
original `ImportError`, not the friendly message added to `_get_variable` here. 
Worth applying the same `hasattr(task_runner, "SUPERVISOR_COMMS")` guard there 
for symmetry, or switching to `from airflow.sdk.execution_time import 
task_runner` + `task_runner.SUPERVISOR_COMMS` like `_set_variable` / 
`_delete_variable` now do.



##########
airflow-core/tests/unit/serialization/test_serialized_objects.py:
##########
@@ -1063,8 +1063,11 @@ def test_logging_propogated_by_default(self, caplog):
         BaseOperator(task_id="test").log.warning("test")
         # This looks like "how could it fail" but this actually checks that 
the handler called `emit`. Testing
         # the other case (that when we have set_context it goes to the file is 
harder to achieve without
-        # leaking a lot of state)
-        assert caplog.messages == ["test"]
+        # leaking a lot of state). Only assert on the operator's logger so 
other loggers (e.g. OTLP trace
+        # export errors in CI) do not affect the test.
+        operator_logger_prefix = "airflow.task.operators"
+        operator_messages = [r.message for r in caplog.records if 
r.name.startswith(operator_logger_prefix)]

Review Comment:
   This OTLP-noise filter is unrelated to the `SUPERVISOR_COMMS` error-handling 
scope of the PR. It's a legitimate CI-flake fix (other loggers polluting 
`caplog.messages`), but bundling it here makes the change harder to bisect 
later if it ever causes regressions.
   
   Either split it into its own PR or call it out explicitly in the description 
so reviewers know it's intentional scope creep.



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -371,7 +388,21 @@ def _set_variable(key: str, value: Any, description: str | 
None = None, serializ
     except Exception as e:
         log.exception(e)
 
-    SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, 
description=description))
+    if not hasattr(task_runner, "SUPERVISOR_COMMS"):

Review Comment:
   This guard fires *after* the conflict-check loop above has already iterated 
every secrets backend and potentially emitted the `log.warning("The variable %s 
is defined in the %s secrets backend, which takes precedence...")` message -- 
even though we're about to bail out because there is no `SUPERVISOR_COMMS` to 
send `PutVariable` through.
   
   Compare with `_delete_variable` just below, which guards at the top of the 
function before any work. For consistency (and to avoid the misleading warning 
about a write that never happens), hoist this `hasattr` check above the 
conflict-check loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to