This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new d1af2452166 [v3-1-test] Add support for "reconnecting" Supervisor
Comms and logs in task processes (#57212) (#58263)
d1af2452166 is described below
commit d1af245216645b4a3d85c65fcef9e78db9b27d4b
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Nov 13 13:05:33 2025 +0000
[v3-1-test] Add support for "reconnecting" Supervisor Comms and logs in
task processes (#57212) (#58263)
This change relates to #51422, #54706 and gives us a single function that we
can call from the Python Virtual env script to re-connect the
SUPERVISOR_COMMS
and logs socket, so variables and connections are accessible from within the
VEnv task, as long as task-sdk is installed.
This also fixes logs produced by a venv task to not be double "encoded"
(i.e.
so that we directly see the actual log, not a stringification of the entire
log line with two timestamps etc.)
There is a matching change to the Venv script to use this function, but it
is
separated into a different PR so that we don't merge core/sdk and provider
changes in one PR.
The crux of the change in the venv operator (that I've already tested) is
this
```diff
+try:
+ from airflow.sdk.execution_time import task_runner
+except ModuleNotFoundError:
+ pass
+else:
+ reinit_supervisor_comms = getattr(task_runner, "reinit_supervisor_comms")
+ if reinit_supervisor_comms:
+ reinit_supervisor_comms()
```
(cherry picked from commit ff4e47172385bd6a71f3fc0b883bf94f9802eb7c)
Co-authored-by: Amogh Desai <[email protected]>
---
airflow-core/src/airflow/settings.py | 15 +++--
.../src/airflow/sdk/execution_time/task_runner.py | 45 ++++++++++-----
.../task_sdk/execution_time/test_supervisor.py | 67 ++++++++++++++++++++++
3 files changed, 106 insertions(+), 21 deletions(-)
diff --git a/airflow-core/src/airflow/settings.py
b/airflow-core/src/airflow/settings.py
index 9a54060ea5c..e2d96d96f52 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -700,17 +700,16 @@ def initialize():
configure_adapters()
# The webservers import this file from models.py with the default settings.
- if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None):
- is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1"
- if not is_worker:
- configure_orm()
- configure_action_logging()
-
# Configure secrets masker before masking secrets
_configure_secrets_masker()
- # mask the sensitive_config_values
- conf.mask_secrets()
+ is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1"
+ if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not
is_worker:
+ configure_orm()
+
+ # mask the sensitive_config_values
+ conf.mask_secrets()
+ configure_action_logging()
# Run any custom runtime checks that needs to be executed for providers
run_providers_custom_runtime_checks()
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 1b789fe59b0..41b11084d7f 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -694,20 +694,15 @@ def startup() -> tuple[RuntimeTaskInstance, Context,
Logger]:
# in response to us sending a request.
log = structlog.get_logger(logger_name="task")
- if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and
os.environ.get("_AIRFLOW__STARTUP_MSG"):
+ if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and (
+ msgjson := os.environ.get("_AIRFLOW__STARTUP_MSG")
+ ):
# Clear any Kerberos replace cache if there is one, so new process
can't reuse it.
os.environ.pop("KRB5CCNAME", None)
# entrypoint of re-exec process
- msg =
TypeAdapter(StartupDetails).validate_json(os.environ["_AIRFLOW__STARTUP_MSG"])
- logs = SUPERVISOR_COMMS.send(ResendLoggingFD())
- if isinstance(logs, SentFDs):
- from airflow.sdk.log import configure_logging
-
- log_io = os.fdopen(logs.fds[0], "wb", buffering=0)
- configure_logging(json_output=True, output=log_io,
sending_to_supervisor=True)
- else:
- print("Unable to re-configure logging after sudo, we didn't get an
FD", file=sys.stderr)
+ msg: StartupDetails =
TypeAdapter(StartupDetails).validate_json(msgjson)
+ reinit_supervisor_comms()
# We delay this message until _after_ we've got the logging
re-configured, otherwise it will show up
# on stdout
@@ -716,8 +711,9 @@ def startup() -> tuple[RuntimeTaskInstance, Context,
Logger]:
# normal entry point
msg = SUPERVISOR_COMMS._get_response() # type: ignore[assignment]
- if not isinstance(msg, StartupDetails):
- raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}")
+ if not isinstance(msg, StartupDetails):
+ raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}")
+
# setproctitle causes issue on Mac OS:
https://github.com/benoitc/gunicorn/issues/3021
os_type = sys.platform
if os_type == "darwin":
@@ -1443,7 +1439,6 @@ def finalize(
def main():
- # TODO: add an exception here, it causes an oof of a stack trace if it
happens to early!
log = structlog.get_logger(logger_name="task")
global SUPERVISOR_COMMS
@@ -1472,5 +1467,29 @@ def main():
SUPERVISOR_COMMS.socket.close()
+def reinit_supervisor_comms() -> None:
+ """
+ Re-initialize supervisor comms and logging channel in subprocess.
+
+ This is not needed for most cases, but is used when either we re-launch
the process via sudo for
+ run_as_user, or from inside the python code in a virtualenv (et al.)
operator to re-connect so those tasks
+ can continue to access variables etc.
+ """
+ if "SUPERVISOR_COMMS" not in globals():
+ global SUPERVISOR_COMMS
+ log = structlog.get_logger(logger_name="task")
+
+ SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log)
+
+ logs = SUPERVISOR_COMMS.send(ResendLoggingFD())
+ if isinstance(logs, SentFDs):
+ from airflow.sdk.log import configure_logging
+
+ log_io = os.fdopen(logs.fds[0], "wb", buffering=0)
+ configure_logging(json_output=True, output=log_io,
sending_to_supervisor=True)
+ else:
+ print("Unable to re-configure logging after sudo, we didn't get an
FD", file=sys.stderr)
+
+
if __name__ == "__main__":
main()
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index d9032701cec..141bb1967e9 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -25,12 +25,14 @@ import re
import selectors
import signal
import socket
+import subprocess
import sys
import time
from contextlib import nullcontext
from dataclasses import dataclass, field
from operator import attrgetter
from random import randint
+from textwrap import dedent
from time import sleep
from typing import TYPE_CHECKING, Any
from unittest import mock
@@ -2539,3 +2541,68 @@ def
test_remote_logging_conn_caches_connection_not_client(monkeypatch):
gc.collect()
assert backend.calls == 1, "Connection should be cached, not fetched
multiple times"
assert all(ref() is None for ref in clients), "Client instances should be
garbage collected"
+
+
+def test_reinit_supervisor_comms(monkeypatch, client_with_ti_start, caplog):
+ def subprocess_main():
+ # This is run in the subprocess!
+
+ # Ensure we follow the "protocol" and get the startup message before
we do anything else
+ c = CommsDecoder()
+ c._get_response()
+
+ # This mirrors what the VirtualEnvProvider puts in it's script
+ script = """
+ import os
+ import sys
+ import structlog
+
+ from airflow.sdk import Connection
+ from airflow.sdk.execution_time.task_runner import
reinit_supervisor_comms
+
+ reinit_supervisor_comms()
+
+ Connection.get("a")
+ print("ok")
+ sys.stdout.flush()
+
+ structlog.get_logger().info("is connected")
+ """
+ # Now we launch a new process, as VirtualEnvOperator will do
+ subprocess.check_call([sys.executable, "-c", dedent(script)])
+
+ client_with_ti_start.connections.get.return_value = ConnectionResult(
+ conn_id="test_conn", conn_type="mysql", login="a", password="password1"
+ )
+ proc = ActivitySubprocess.start(
+ dag_rel_path=os.devnull,
+ bundle_info=FAKE_BUNDLE,
+ what=TaskInstance(
+ id="4d828a62-a417-4936-a7a6-2b3fabacecab",
+ task_id="b",
+ dag_id="c",
+ run_id="d",
+ try_number=1,
+ dag_version_id=uuid7(),
+ ),
+ client=client_with_ti_start,
+ target=subprocess_main,
+ )
+
+ rc = proc.wait()
+
+ assert rc == 0, caplog.text
+ # Check that the log messages are write. We should expect stdout to apper
right, and crucially, we should
+ # expect logs from the venv process to appear without extra "wrapping"
+ assert {
+ "logger": "task.stdout",
+ "event": "ok",
+ "log_level": "info",
+ "timestamp": mock.ANY,
+ } in caplog, caplog.text
+ assert {
+ "logger_name": "task",
+ "log_level": "info",
+ "event": "is connected",
+ "timestamp": mock.ANY,
+ } in caplog, caplog.text