This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ccf34ff384f Mark Triggerer supervisor as a server context so it can 
read metastore connections (#64022)
ccf34ff384f is described below

commit ccf34ff384f09ddbe34e3e36e0ff3b21ddfaa08a
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Mon Jun 8 16:11:07 2026 +0530

    Mark Triggerer supervisor as a server context so it can read metastore 
connections (#64022)
    
    * fix: set server context in triggerer so MetastoreBackend is included for 
deadline callbacks
    
    * fix: replace redundant test with triggerer_command test per review 
feedback
    
    * fix: move server context setup to triggerer _execute() per review feedback
    
    * fix: move import os to top-level in triggerer_job_runner per review
    
    * fix: set client context in TriggerRunner subprocess to prevent inheriting 
server privileges
    
    * fix: restore _AIRFLOW_PROCESS_CONTEXT after triggerer execution and 
clarify docstring
    
    * fix: capture _AIRFLOW_PROCESS_CONTEXT during execution in test
    
    * fix: correct typo subtproc -> subprocess in comment
    
    * fix: add client-context test and update arun patch to use async 
side_effect
---
 .../src/airflow/jobs/triggerer_job_runner.py       | 32 +++++++++++---
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 49 ++++++++++++++++++++++
 .../src/airflow/sdk/execution_time/supervisor.py   |  6 +--
 3 files changed, 78 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index e8e1f66e787..326ab14d7a4 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -233,12 +233,20 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
             sys.exit(os.EX_SOFTWARE)
 
     def _execute(self) -> int | None:
+        # Mark as server context for secrets backend detection when handling 
GetConnection
+        # requests from the TriggerRunner subprocess (needs MetastoreBackend).
+        # The subprocess explicitly sets _AIRFLOW_PROCESS_CONTEXT=client to 
prevent
+        # inheriting server privileges (runs user trigger/callback code).
+        # Similar to DagProcessorManager / DagProcessor child pattern.
+        _prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+        os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
         self.log.info("Starting the triggerer")
         self.register_signals()
         stats.initialize(
             factory=stats_utils.get_stats_factory(),
             export_legacy_names=conf.getboolean("metrics", "legacy_names_on"),
         )
+        self.trigger_runner = None
         try:
             # Kick off runner sub-process without DB access
             self.trigger_runner = TriggerRunnerSupervisor.start(
@@ -248,7 +256,6 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
                 queues=self.queues,
                 team_name=self.team_name,
             )
-
             # Run the main DB comms loop in this process
             self.trigger_runner.run()
             return self.trigger_runner._exit_code
@@ -257,13 +264,16 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
             raise
         finally:
             self.log.info("Waiting for triggers to clean up")
-            # Tell the subtproc to stop and then wait for it.
+            # Tell the subprocess to stop and then wait for it.
             # If the user interrupts/terms again, _graceful_exit will allow 
them
             # to force-kill here. trigger_runner may be None if start() raised.
             if self.trigger_runner is not None:
                 self.trigger_runner.kill(escalation_delay=10, force=True)
             self.log.info("Exited trigger loop")
-        return None
+            if _prev_ctx is None:
+                os.environ.pop("_AIRFLOW_PROCESS_CONTEXT", None)
+            else:
+                os.environ["_AIRFLOW_PROCESS_CONTEXT"] = _prev_ctx
 
 
 log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__)
@@ -1103,9 +1113,19 @@ class TriggerRunner:
 
     def run(self):
         """Sync entrypoint - just run arun in an async loop."""
-        signal.signal(signal.SIGINT, self._handle_signal)
-        signal.signal(signal.SIGTERM, self._handle_signal)
-        asyncio.run(self.arun())
+        # Mark as client-side (runs user trigger/callback code)
+        # Prevents inheriting server context from parent 
TriggerRunnerSupervisor
+        prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+        os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+        try:
+            signal.signal(signal.SIGINT, self._handle_signal)
+            signal.signal(signal.SIGTERM, self._handle_signal)
+            asyncio.run(self.arun())
+        finally:
+            if prev_ctx is None:
+                os.environ.pop("_AIRFLOW_PROCESS_CONTEXT", None)
+            else:
+                os.environ["_AIRFLOW_PROCESS_CONTEXT"] = prev_ctx
 
     async def arun(self):
         """
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 4984c0864a8..6464274a938 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1886,6 +1886,55 @@ class TestTriggererJobRunner:
         call_kwargs = stats_init_mock.call_args.kwargs
         assert "factory" in call_kwargs
 
+    @patch.object(TriggerRunnerSupervisor, "start")
+    def test_execute_sets_server_process_context(self, mock_supervisor_start, 
session, monkeypatch):
+        """_execute marks triggerer as server context for secrets backend 
detection."""
+        captured_context = {}
+
+        def capture_env(*args, **kwargs):
+            captured_context["value"] = 
os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+            mock_supervisor = MagicMock(spec=TriggerRunnerSupervisor)
+            mock_supervisor._exit_code = 0
+            return mock_supervisor
+
+        mock_supervisor_start.side_effect = capture_env
+
+        job = Job()
+        session.add(job)
+        session.flush()
+
+        monkeypatch.delenv("_AIRFLOW_PROCESS_CONTEXT", raising=False)
+        job_runner = TriggererJobRunner(job)
+
+        with (
+            patch.object(job_runner, "register_signals"),
+            patch("airflow.jobs.triggerer_job_runner.stats.initialize"),
+        ):
+            job_runner._execute()
+
+        assert captured_context["value"] == "server"
+        # Verify env var is restored after _execute() returns.
+        assert os.environ.get("_AIRFLOW_PROCESS_CONTEXT") is None
+
+    def test_trigger_runner_sets_client_process_context(self, monkeypatch):
+        """TriggerRunner.run() marks subprocess as client context to prevent 
inheriting server privileges."""
+        captured_context = {}
+
+        async def capture_env(*args, **kwargs):
+            captured_context["value"] = 
os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+
+        monkeypatch.delenv("_AIRFLOW_PROCESS_CONTEXT", raising=False)
+        runner = TriggerRunner()
+        with (
+            patch.object(runner, "arun", side_effect=capture_env),
+            patch("signal.signal"),
+        ):
+            runner.run()
+
+        assert captured_context["value"] == "client"
+        # Verify env var is restored after run() returns.
+        assert os.environ.get("_AIRFLOW_PROCESS_CONTEXT") is None
+
 
 class TestTriggererMessageTypes:
     def test_message_types_in_triggerer(self):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 52821579759..6527e651041 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -2349,8 +2349,8 @@ def ensure_secrets_backend_loaded() -> 
list[BaseSecretsBackend]:
     2. _AIRFLOW_PROCESS_CONTEXT=server env var → server chain 
(MetastoreBackend)
     3. Neither → fallback chain (only env vars + external backends, no 
MetastoreBackend)
 
-    Client contexts: task runner in worker (has SUPERVISOR_COMMS)
-    Server contexts: API server, scheduler (set 
_AIRFLOW_PROCESS_CONTEXT=server)
+    Client contexts: task runner in worker (has SUPERVISOR_COMMS), triggerer 
runner subprocess (set _AIRFLOW_PROCESS_CONTEXT=client)
+    Server contexts: API server, scheduler, triggerer supervisor (set 
_AIRFLOW_PROCESS_CONTEXT=server)
     Fallback contexts: supervisor, unknown contexts (no SUPERVISOR_COMMS, no 
env var)
 
     The fallback chain ensures supervisor can use external secrets (AWS 
Secrets Manager,
@@ -2373,7 +2373,7 @@ def ensure_secrets_backend_loaded() -> 
list[BaseSecretsBackend]:
 
     # 2. Check for explicit server context
     if os.environ.get("_AIRFLOW_PROCESS_CONTEXT") == "server":
-        # Server context: API server, scheduler
+        # Server context: API server, scheduler, triggerer
         # uses the default server list
         return ensure_secrets_loaded()
 

Reply via email to