This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ef0b144661ea396b4c120a6fd6189633b34c628c Author: Karthikeyan Singaravelan <[email protected]> AuthorDate: Thu Sep 11 09:49:08 2025 +0530 Differentiate between triggers and watchers currently running for better visibility. (#55376) * Differentiate between triggers and watchers currently running for better visibility. * Add test. (cherry picked from commit d88f192b4cb516280548c555ec4e2cd241dd21ab) --- .../src/airflow/jobs/triggerer_job_runner.py | 7 ++++-- airflow-core/tests/unit/jobs/test_triggerer_job.py | 26 ++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index a1f8b3a0974..06f12ce3191 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -870,8 +870,10 @@ class TriggerRunner: await asyncio.sleep(1) # Every minute, log status if (now := time.monotonic()) - last_status >= 60: - count = len(self.triggers) - self.log.info("%i triggers currently running", count) + watchers = len([trigger for trigger in self.triggers.values() if trigger["is_watcher"]]) + triggers = len(self.triggers) - watchers + self.log.info("%i triggers currently running", triggers) + self.log.info("%i watchers currently running", watchers) last_status = now except Exception: @@ -956,6 +958,7 @@ class TriggerRunner: "task": asyncio.create_task( self.run_trigger(trigger_id, trigger_instance), name=trigger_name ), + "is_watcher": isinstance(trigger_instance, events.BaseEventTrigger), "name": trigger_name, "events": 0, } diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 121088c645e..2ec8733992e 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -19,6 +19,7 @@ from __future__ import annotations import asyncio import datetime +import itertools import os import selectors import time @@ -52,6 +53,7 @@ from airflow.models.variable import Variable from airflow.models.xcom import XComModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator +from airflow.providers.standard.triggers.file import FileDeleteTrigger from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sdk import BaseHook, BaseOperator from airflow.serialization.serialized_objects import LazyDeserializedDAG @@ -266,6 +268,30 @@ def test_trigger_lifecycle(spy_agency: SpyAgency, session, testing_dag_bundle): trigger_runner_supervisor.kill(force=False) [email protected]( + "trigger, watcher_count, trigger_count", + [(TimeDeltaTrigger(datetime.timedelta(days=7)), 0, 1), (FileDeleteTrigger("/tmp/foo.txt"), 1, 0)], +) +@patch("time.monotonic", side_effect=itertools.count(start=1, step=60)) +def test_trigger_log(mock_monotonic, trigger, watcher_count, trigger_count, session, capsys): + """ + Checks that the triggerer will log watcher and trigger in separate lines. + """ + create_trigger_in_db(session, trigger) + + trigger_runner_supervisor = TriggerRunnerSupervisor.start(job=Job(id=12345), capacity=10) + trigger_runner_supervisor.load_triggers() + + for _ in range(10): + trigger_runner_supervisor._service_subprocess(0.1) + + stdout = capsys.readouterr().out + assert f"{trigger_count} triggers currently running" in stdout + assert f"{watcher_count} watchers currently running" in stdout + + trigger_runner_supervisor.kill(force=False) + + class TestTriggerRunner: @pytest.mark.asyncio async def test_run_inline_trigger_canceled(self, session) -> None:
