ashb commented on code in PR #49005:
URL: https://github.com/apache/airflow/pull/49005#discussion_r2035350348
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -209,6 +209,52 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
return variable.value
+def _set_variable(key: str, value: Any, description: str | None = None,
serialize_json: bool = False) -> None:
+ # TODO: This should probably be moved to a separate module like
`airflow.sdk.execution_time.comms`
+ # or `airflow.sdk.execution_time.variable`
+ # A reason to not move it to `airflow.sdk.execution_time.comms` is that
it
+ # will make that module depend on Task SDK, which is not ideal because
we intend to
+ # keep Task SDK as a separate package than execution time mods.
+ import json
+
+ from airflow.sdk.execution_time.comms import PutVariable
+ from airflow.sdk.execution_time.supervisor import
ensure_secrets_backend_loaded
+ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+
+ # check for write conflicts on the worker
+ for secrets_backend in ensure_secrets_backend_loaded():
+ try:
+ var_val = secrets_backend.get_variable(key=key)
+ if var_val is not None:
+ _backend_name = type(secrets_backend).__name__
+ log.warning(
+ "The variable %s is defined in the %s secrets backend,
which takes "
+ "precedence over reading from the database. The value in
the database will be "
+ "updated, but to read it you have to delete the
conflicting variable "
+ "from %s",
+ key,
+ _backend_name,
+ _backend_name,
+ )
+ except Exception:
+ log.exception(
+ "Unable to retrieve variable from secrets backend (%s).
Checking subsequent secrets backend.",
+ type(secrets_backend).__name__,
+ )
+
+ try:
+ if serialize_json:
+ value = json.dumps(value, indent=2)
+ except Exception as e:
+ log.exception(e)
+ # Since Triggers can hit this code path via `sync_to_async` (which uses
threads internally)
+ # we need to make sure that we "atomically" send a request and get the
response to that
+ # back so that two triggers don't end up interleaving requests and create
a possible
+ # race condition where the wrong trigger reads the response.
+ with SUPERVISOR_COMMS.lock:
Review Comment:
Best to have this everywhere tbh. A lock is no good if _one_ place skips it
and then it later matters.
(Yes, we should encapsulate send+get resp into a single message, and also
expect every message to have a response. Right now there is no way to pass
errors back to the client (task/dag file whatever) if the underlying API call
fails. https://github.com/apache/airflow/issues/46426)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]