This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ccf34ff384f Mark Triggerer supervisor as a server context so it can
read metastore connections (#64022)
ccf34ff384f is described below
commit ccf34ff384f09ddbe34e3e36e0ff3b21ddfaa08a
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Mon Jun 8 16:11:07 2026 +0530
Mark Triggerer supervisor as a server context so it can read metastore
connections (#64022)
* fix: set server context in triggerer so MetastoreBackend is included for
deadline callbacks
* fix: replace redundant test with triggerer_command test per review
feedback
* fix: move server context setup to triggerer _execute() per review feedback
* fix: move import os to top-level in triggerer_job_runner per review
* fix: set client context in TriggerRunner subprocess to prevent inheriting
server privileges
* fix: restore _AIRFLOW_PROCESS_CONTEXT after triggerer execution and
clarify docstring
* fix: capture _AIRFLOW_PROCESS_CONTEXT during execution in test
* fix: correct typo subtproc -> subprocess in comment
* fix: add client-context test and update arun patch to use async
side_effect
---
.../src/airflow/jobs/triggerer_job_runner.py | 32 +++++++++++---
airflow-core/tests/unit/jobs/test_triggerer_job.py | 49 ++++++++++++++++++++++
.../src/airflow/sdk/execution_time/supervisor.py | 6 +--
3 files changed, 78 insertions(+), 9 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index e8e1f66e787..326ab14d7a4 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -233,12 +233,20 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
sys.exit(os.EX_SOFTWARE)
def _execute(self) -> int | None:
+ # Mark as server context for secrets backend detection when handling
GetConnection
+ # requests from the TriggerRunner subprocess (needs MetastoreBackend).
+ # The subprocess explicitly sets _AIRFLOW_PROCESS_CONTEXT=client to
prevent
+ # inheriting server privileges (runs user trigger/callback code).
+ # Similar to DagProcessorManager / DagProcessor child pattern.
+ _prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
self.log.info("Starting the triggerer")
self.register_signals()
stats.initialize(
factory=stats_utils.get_stats_factory(),
export_legacy_names=conf.getboolean("metrics", "legacy_names_on"),
)
+ self.trigger_runner = None
try:
# Kick off runner sub-process without DB access
self.trigger_runner = TriggerRunnerSupervisor.start(
@@ -248,7 +256,6 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
queues=self.queues,
team_name=self.team_name,
)
-
# Run the main DB comms loop in this process
self.trigger_runner.run()
return self.trigger_runner._exit_code
@@ -257,13 +264,16 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
raise
finally:
self.log.info("Waiting for triggers to clean up")
- # Tell the subtproc to stop and then wait for it.
+ # Tell the subprocess to stop and then wait for it.
# If the user interrupts/terms again, _graceful_exit will allow
them
# to force-kill here. trigger_runner may be None if start() raised.
if self.trigger_runner is not None:
self.trigger_runner.kill(escalation_delay=10, force=True)
self.log.info("Exited trigger loop")
- return None
+ if _prev_ctx is None:
+ os.environ.pop("_AIRFLOW_PROCESS_CONTEXT", None)
+ else:
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = _prev_ctx
log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__)
@@ -1103,9 +1113,19 @@ class TriggerRunner:
def run(self):
"""Sync entrypoint - just run arun in an async loop."""
- signal.signal(signal.SIGINT, self._handle_signal)
- signal.signal(signal.SIGTERM, self._handle_signal)
- asyncio.run(self.arun())
+ # Mark as client-side (runs user trigger/callback code)
+ # Prevents inheriting server context from parent
TriggerRunnerSupervisor
+ prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+ try:
+ signal.signal(signal.SIGINT, self._handle_signal)
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ asyncio.run(self.arun())
+ finally:
+ if prev_ctx is None:
+ os.environ.pop("_AIRFLOW_PROCESS_CONTEXT", None)
+ else:
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = prev_ctx
async def arun(self):
"""
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 4984c0864a8..6464274a938 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1886,6 +1886,55 @@ class TestTriggererJobRunner:
call_kwargs = stats_init_mock.call_args.kwargs
assert "factory" in call_kwargs
+ @patch.object(TriggerRunnerSupervisor, "start")
+ def test_execute_sets_server_process_context(self, mock_supervisor_start,
session, monkeypatch):
+ """_execute marks triggerer as server context for secrets backend
detection."""
+ captured_context = {}
+
+ def capture_env(*args, **kwargs):
+ captured_context["value"] =
os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+ mock_supervisor = MagicMock(spec=TriggerRunnerSupervisor)
+ mock_supervisor._exit_code = 0
+ return mock_supervisor
+
+ mock_supervisor_start.side_effect = capture_env
+
+ job = Job()
+ session.add(job)
+ session.flush()
+
+ monkeypatch.delenv("_AIRFLOW_PROCESS_CONTEXT", raising=False)
+ job_runner = TriggererJobRunner(job)
+
+ with (
+ patch.object(job_runner, "register_signals"),
+ patch("airflow.jobs.triggerer_job_runner.stats.initialize"),
+ ):
+ job_runner._execute()
+
+ assert captured_context["value"] == "server"
+ # Verify env var is restored after _execute() returns.
+ assert os.environ.get("_AIRFLOW_PROCESS_CONTEXT") is None
+
+ def test_trigger_runner_sets_client_process_context(self, monkeypatch):
+ """TriggerRunner.run() marks subprocess as client context to prevent
inheriting server privileges."""
+ captured_context = {}
+
+ async def capture_env(*args, **kwargs):
+ captured_context["value"] =
os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+
+ monkeypatch.delenv("_AIRFLOW_PROCESS_CONTEXT", raising=False)
+ runner = TriggerRunner()
+ with (
+ patch.object(runner, "arun", side_effect=capture_env),
+ patch("signal.signal"),
+ ):
+ runner.run()
+
+ assert captured_context["value"] == "client"
+ # Verify env var is restored after run() returns.
+ assert os.environ.get("_AIRFLOW_PROCESS_CONTEXT") is None
+
class TestTriggererMessageTypes:
def test_message_types_in_triggerer(self):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 52821579759..6527e651041 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -2349,8 +2349,8 @@ def ensure_secrets_backend_loaded() ->
list[BaseSecretsBackend]:
2. _AIRFLOW_PROCESS_CONTEXT=server env var → server chain
(MetastoreBackend)
3. Neither → fallback chain (only env vars + external backends, no
MetastoreBackend)
- Client contexts: task runner in worker (has SUPERVISOR_COMMS)
- Server contexts: API server, scheduler (set
_AIRFLOW_PROCESS_CONTEXT=server)
+ Client contexts: task runner in worker (has SUPERVISOR_COMMS), triggerer
runner subprocess (set _AIRFLOW_PROCESS_CONTEXT=client)
+ Server contexts: API server, scheduler, triggerer supervisor (set
_AIRFLOW_PROCESS_CONTEXT=server)
Fallback contexts: supervisor, unknown contexts (no SUPERVISOR_COMMS, no
env var)
The fallback chain ensures supervisor can use external secrets (AWS
Secrets Manager,
@@ -2373,7 +2373,7 @@ def ensure_secrets_backend_loaded() ->
list[BaseSecretsBackend]:
# 2. Check for explicit server context
if os.environ.get("_AIRFLOW_PROCESS_CONTEXT") == "server":
- # Server context: API server, scheduler
+ # Server context: API server, scheduler, triggerer
# uses the default server list
return ensure_secrets_loaded()