kaxil commented on code in PR #61630:
URL: https://github.com/apache/airflow/pull/61630#discussion_r3295454330
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -291,9 +291,26 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
)
# If no backend found the variable, raise a not found error (mirrors
_get_connection)
- from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
+ from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.comms import ErrorResponse
+ if not hasattr(task_runner, "SUPERVISOR_COMMS"):
+ raise AirflowRuntimeError(
+ ErrorResponse(
+ error=ErrorType.VARIABLE_NOT_FOUND,
+ detail={
+ "message": (
+ f"Variable '{key}' not found. Note: SUPERVISOR_COMMS
is not available, "
+ "which means this code is running outside a task
execution context "
+ "(e.g., at the top level of a DAG file). "
+ "Consider using environment variables
(AIRFLOW_VAR_<key>), "
+ "Jinja templates ({{ var.value.<key> }}), "
+ "or move the Variable.get() call inside a task
function."
+ )
+ },
+ )
+ )
+
raise AirflowRuntimeError(
ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message":
f"Variable {key} not found"})
)
Review Comment:
Related: `_get_variable_keys` below (line 329) still has the bare `from
airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS` at the top of
the function. Since `SUPERVISOR_COMMS` is now a bare annotation in
`task_runner.py` (`SUPERVISOR_COMMS: CommsDecoder[...]` at line 930), this
import raises `ImportError` at function entry whenever the function is called
outside a task context -- exactly the opaque failure mode this PR is trying to
wrap with a helpful message.
`Variable.keys()` from a DAG top-level (or any non-task caller) will hit the
original `ImportError`, not the friendly message added to `_get_variable` here.
Worth applying the same `hasattr(task_runner, "SUPERVISOR_COMMS")` guard there
for symmetry, or switching to `from airflow.sdk.execution_time import
task_runner` + `task_runner.SUPERVISOR_COMMS` like `_set_variable` /
`_delete_variable` now do.
##########
airflow-core/tests/unit/serialization/test_serialized_objects.py:
##########
@@ -1063,8 +1063,11 @@ def test_logging_propogated_by_default(self, caplog):
BaseOperator(task_id="test").log.warning("test")
# This looks like "how could it fail" but this actually checks that
the handler called `emit`. Testing
# the other case (that when we have set_context it goes to the file is
harder to achieve without
- # leaking a lot of state)
- assert caplog.messages == ["test"]
+ # leaking a lot of state). Only assert on the operator's logger so
other loggers (e.g. OTLP trace
+ # export errors in CI) do not affect the test.
+ operator_logger_prefix = "airflow.task.operators"
+ operator_messages = [r.message for r in caplog.records if
r.name.startswith(operator_logger_prefix)]
Review Comment:
This OTLP-noise filter is unrelated to the `SUPERVISOR_COMMS` error-handling
scope of the PR. It's a legitimate CI-flake fix (other loggers polluting
`caplog.messages`), but bundling it here makes the change harder to bisect
later if it ever causes regressions.
Either split it into its own PR or call it out explicitly in the description
so reviewers know it's intentional scope creep.
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -371,7 +388,21 @@ def _set_variable(key: str, value: Any, description: str |
None = None, serializ
except Exception as e:
log.exception(e)
- SUPERVISOR_COMMS.send(PutVariable(key=key, value=value,
description=description))
+ if not hasattr(task_runner, "SUPERVISOR_COMMS"):
Review Comment:
This guard fires *after* the conflict-check loop above has already iterated
every secrets backend and potentially emitted the `log.warning("The variable %s
is defined in the %s secrets backend, which takes precedence...")` message --
even though we're about to bail out because there is no `SUPERVISOR_COMMS` to
send `PutVariable` through.
Compare with `_delete_variable` just below, which guards at the top of the
function before any work. For consistency (and to avoid the misleading warning
about a write that never happens), hoist this `hasattr` check above the
conflict-check loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]