This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ef0b144661ea396b4c120a6fd6189633b34c628c
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Thu Sep 11 09:49:08 2025 +0530

    Differentiate between triggers and watchers currently running for better 
visibility. (#55376)
    
    * Differentiate between triggers and watchers currently running for better 
visibility.
    
    * Add test.
    
    (cherry picked from commit d88f192b4cb516280548c555ec4e2cd241dd21ab)
---
 .../src/airflow/jobs/triggerer_job_runner.py       |  7 ++++--
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 26 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index a1f8b3a0974..06f12ce3191 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -870,8 +870,10 @@ class TriggerRunner:
                 await asyncio.sleep(1)
                 # Every minute, log status
                 if (now := time.monotonic()) - last_status >= 60:
-                    count = len(self.triggers)
-                    self.log.info("%i triggers currently running", count)
+                    watchers = len([trigger for trigger in 
self.triggers.values() if trigger["is_watcher"]])
+                    triggers = len(self.triggers) - watchers
+                    self.log.info("%i triggers currently running", triggers)
+                    self.log.info("%i watchers currently running", watchers)
                     last_status = now
 
         except Exception:
@@ -956,6 +958,7 @@ class TriggerRunner:
                 "task": asyncio.create_task(
                     self.run_trigger(trigger_id, trigger_instance), 
name=trigger_name
                 ),
+                "is_watcher": isinstance(trigger_instance, 
events.BaseEventTrigger),
                 "name": trigger_name,
                 "events": 0,
             }
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 121088c645e..2ec8733992e 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import asyncio
 import datetime
+import itertools
 import os
 import selectors
 import time
@@ -52,6 +53,7 @@ from airflow.models.variable import Variable
 from airflow.models.xcom import XComModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
+from airflow.providers.standard.triggers.file import FileDeleteTrigger
 from airflow.providers.standard.triggers.temporal import DateTimeTrigger, 
TimeDeltaTrigger
 from airflow.sdk import BaseHook, BaseOperator
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
@@ -266,6 +268,30 @@ def test_trigger_lifecycle(spy_agency: SpyAgency, session, 
testing_dag_bundle):
         trigger_runner_supervisor.kill(force=False)
 
 
[email protected](
+    "trigger, watcher_count, trigger_count",
+    [(TimeDeltaTrigger(datetime.timedelta(days=7)), 0, 1), 
(FileDeleteTrigger("/tmp/foo.txt"), 1, 0)],
+)
+@patch("time.monotonic", side_effect=itertools.count(start=1, step=60))
+def test_trigger_log(mock_monotonic, trigger, watcher_count, trigger_count, 
session, capsys):
+    """
+    Checks that the triggerer will log watcher and trigger in separate lines.
+    """
+    create_trigger_in_db(session, trigger)
+
+    trigger_runner_supervisor = 
TriggerRunnerSupervisor.start(job=Job(id=12345), capacity=10)
+    trigger_runner_supervisor.load_triggers()
+
+    for _ in range(10):
+        trigger_runner_supervisor._service_subprocess(0.1)
+
+    stdout = capsys.readouterr().out
+    assert f"{trigger_count} triggers currently running" in stdout
+    assert f"{watcher_count} watchers currently running" in stdout
+
+    trigger_runner_supervisor.kill(force=False)
+
+
 class TestTriggerRunner:
     @pytest.mark.asyncio
     async def test_run_inline_trigger_canceled(self, session) -> None:

Reply via email to