This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch backport-ff4e471-v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 56ad62682440857e12b500bbb960b17f202a8353 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Tue Oct 28 15:56:17 2025 +0000 [v3-1-test] Add support for "reconnecting" Supervisor Comms and logs in task processes (#57212) This change relates to #51422, #54706 and gives us a single function that we can call from the Python Virtual env script to re-connect the SUPERVISOR_COMMS and logs socket, so variables and connections are accessible from within the VEnv task, as long as task-sdk is installed. This also fixes logs produced by a venv task to not be double "encoded" (i.e. so that we directly see the actual log, not a stringification of the entire log line with two timestamps etc.) There is a matching change to the Venv script to use this function, but it is separated into a different PR so that we don't merge core/sdk and provider changes in one PR. The crux of the change in the venv operator (that I've already tested) is this ```diff +try: + from airflow.sdk.execution_time import task_runner +except ModuleNotFoundError: + pass +else: + reinit_supervisor_comms = getattr(task_runner, "reinit_supervisor_comms") + if reinit_supervisor_comms: + reinit_supervisor_comms() ``` (cherry picked from commit ff4e47172385bd6a71f3fc0b883bf94f9802eb7c) Co-authored-by: Ash Berlin-Taylor <[email protected]> Co-authored-by: Amogh Desai <[email protected]> --- airflow-core/src/airflow/settings.py | 15 +++-- .../src/airflow/sdk/execution_time/task_runner.py | 45 ++++++++++----- .../task_sdk/execution_time/test_supervisor.py | 67 ++++++++++++++++++++++ 3 files changed, 106 insertions(+), 21 deletions(-) diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index 9a54060ea5c..e2d96d96f52 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -700,17 +700,16 @@ def initialize(): configure_adapters() # The webservers import this file from models.py with the default settings. - if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None): - is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" - if not is_worker: - configure_orm() - configure_action_logging() - # Configure secrets masker before masking secrets _configure_secrets_masker() - # mask the sensitive_config_values - conf.mask_secrets() + is_worker = os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" + if not os.environ.get("PYTHON_OPERATORS_VIRTUAL_ENV_MODE", None) and not is_worker: + configure_orm() + + # mask the sensitive_config_values + conf.mask_secrets() + configure_action_logging() # Run any custom runtime checks that needs to be executed for providers run_providers_custom_runtime_checks() diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 1b789fe59b0..41b11084d7f 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -694,20 +694,15 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: # in response to us sending a request. log = structlog.get_logger(logger_name="task") - if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and os.environ.get("_AIRFLOW__STARTUP_MSG"): + if os.environ.get("_AIRFLOW__REEXECUTED_PROCESS") == "1" and ( + msgjson := os.environ.get("_AIRFLOW__STARTUP_MSG") + ): # Clear any Kerberos replace cache if there is one, so new process can't reuse it. os.environ.pop("KRB5CCNAME", None) # entrypoint of re-exec process - msg = TypeAdapter(StartupDetails).validate_json(os.environ["_AIRFLOW__STARTUP_MSG"]) - logs = SUPERVISOR_COMMS.send(ResendLoggingFD()) - if isinstance(logs, SentFDs): - from airflow.sdk.log import configure_logging - - log_io = os.fdopen(logs.fds[0], "wb", buffering=0) - configure_logging(json_output=True, output=log_io, sending_to_supervisor=True) - else: - print("Unable to re-configure logging after sudo, we didn't get an FD", file=sys.stderr) + msg: StartupDetails = TypeAdapter(StartupDetails).validate_json(msgjson) + reinit_supervisor_comms() # We delay this message until _after_ we've got the logging re-configured, otherwise it will show up # on stdout @@ -716,8 +711,9 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: # normal entry point msg = SUPERVISOR_COMMS._get_response() # type: ignore[assignment] - if not isinstance(msg, StartupDetails): - raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}") + if not isinstance(msg, StartupDetails): + raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}") + # setproctitle causes issue on Mac OS: https://github.com/benoitc/gunicorn/issues/3021 os_type = sys.platform if os_type == "darwin": @@ -1443,7 +1439,6 @@ def finalize( def main(): - # TODO: add an exception here, it causes an oof of a stack trace if it happens to early! log = structlog.get_logger(logger_name="task") global SUPERVISOR_COMMS @@ -1472,5 +1467,29 @@ def main(): SUPERVISOR_COMMS.socket.close() +def reinit_supervisor_comms() -> None: + """ + Re-initialize supervisor comms and logging channel in subprocess. + + This is not needed for most cases, but is used when either we re-launch the process via sudo for + run_as_user, or from inside the python code in a virtualenv (et al.) operator to re-connect so those tasks + can continue to access variables etc. + """ + if "SUPERVISOR_COMMS" not in globals(): + global SUPERVISOR_COMMS + log = structlog.get_logger(logger_name="task") + + SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log) + + logs = SUPERVISOR_COMMS.send(ResendLoggingFD()) + if isinstance(logs, SentFDs): + from airflow.sdk.log import configure_logging + + log_io = os.fdopen(logs.fds[0], "wb", buffering=0) + configure_logging(json_output=True, output=log_io, sending_to_supervisor=True) + else: + print("Unable to re-configure logging after sudo, we didn't get an FD", file=sys.stderr) + + if __name__ == "__main__": main() diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index d9032701cec..141bb1967e9 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -25,12 +25,14 @@ import re import selectors import signal import socket +import subprocess import sys import time from contextlib import nullcontext from dataclasses import dataclass, field from operator import attrgetter from random import randint +from textwrap import dedent from time import sleep from typing import TYPE_CHECKING, Any from unittest import mock @@ -2539,3 +2541,68 @@ def test_remote_logging_conn_caches_connection_not_client(monkeypatch): gc.collect() assert backend.calls == 1, "Connection should be cached, not fetched multiple times" assert all(ref() is None for ref in clients), "Client instances should be garbage collected" + + +def test_reinit_supervisor_comms(monkeypatch, client_with_ti_start, caplog): + def subprocess_main(): + # This is run in the subprocess! + + # Ensure we follow the "protocol" and get the startup message before we do anything else + c = CommsDecoder() + c._get_response() + + # This mirrors what the VirtualEnvProvider puts in it's script + script = """ + import os + import sys + import structlog + + from airflow.sdk import Connection + from airflow.sdk.execution_time.task_runner import reinit_supervisor_comms + + reinit_supervisor_comms() + + Connection.get("a") + print("ok") + sys.stdout.flush() + + structlog.get_logger().info("is connected") + """ + # Now we launch a new process, as VirtualEnvOperator will do + subprocess.check_call([sys.executable, "-c", dedent(script)]) + + client_with_ti_start.connections.get.return_value = ConnectionResult( + conn_id="test_conn", conn_type="mysql", login="a", password="password1" + ) + proc = ActivitySubprocess.start( + dag_rel_path=os.devnull, + bundle_info=FAKE_BUNDLE, + what=TaskInstance( + id="4d828a62-a417-4936-a7a6-2b3fabacecab", + task_id="b", + dag_id="c", + run_id="d", + try_number=1, + dag_version_id=uuid7(), + ), + client=client_with_ti_start, + target=subprocess_main, + ) + + rc = proc.wait() + + assert rc == 0, caplog.text + # Check that the log messages are write. We should expect stdout to apper right, and crucially, we should + # expect logs from the venv process to appear without extra "wrapping" + assert { + "logger": "task.stdout", + "event": "ok", + "log_level": "info", + "timestamp": mock.ANY, + } in caplog, caplog.text + assert { + "logger_name": "task", + "log_level": "info", + "event": "is connected", + "timestamp": mock.ANY, + } in caplog, caplog.text
