dlesco commented on issue #35806:
URL: https://github.com/apache/airflow/issues/35806#issuecomment-1828601815
I drafted some code, but won't be able to test it today.
```patch
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index bb151b32cc..8e1228be7e 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -369,6 +369,8 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
self.emit_metrics()
# Idle sleep
time.sleep(1)
+ # Check watchdog
+ self.trigger_runner.check_watchdog()
def load_triggers(self):
"""Query the database for the triggers we're supposed to be running
and update the runner."""
@@ -445,6 +447,17 @@ class TriggerRunner(threading.Thread, LoggingMixin):
# Should-we-stop flag
stop: bool = False
+ # Intervals between block_watchdog runs
+ watchdog_stats: deque[float]
+
+ # Next time to do watchdog checks
+ next_watchdog_check_time: float
+
+ # Watchdog constants
+ WATCHDOG_INTERVAL = 0.1
+ WATCHDOG_ERROR_THRESHOLD = 0.2
+ WATCHDOG_CHECK_INTERVAL = 10.0
+
def __init__(self):
super().__init__()
self.triggers = {}
@@ -459,6 +472,37 @@ class TriggerRunner(threading.Thread, LoggingMixin):
"""Sync entrypoint - just run a run in an async loop."""
asyncio.run(self.arun())
+ def check_watchdog(self):
+ """Main thread loop calls this to check on block_watchdog delays."""
+
+ now = time.monotonic()
+ if now < self.next_watchdog_check_time:
+ return
+ self.next_watchdog_check_time = now + self.WATCHDOG_CHECK_INTERVAL
+ stats_len = len(self.watchdog_stats)
+ if not stats_len:
+ self.log.error(
+ "Triggerer's async thread was blocked for %.2f seconds, "
+ "likely by a badly-written trigger. Set
PYTHONASYNCIODEBUG=1 "
+ "to get more information on overrunning coroutines.",
+ self.WATCHDOG_CHECK_INTERVAL,
+ )
+ Stats.incr("triggers.blocked_main_thread")
+ return
+ stats = list()
+ for i in range(stats_len):
+ stats.append(self.watchdog_stats.popleft())
+ stats.sort()
+ median_time = stats[len(stats)//2]
+ if median_time > self.WATCHDOG_ERROR_THRESHOLD:
+ self.log.error(
+ "Triggerer's async thread median task scheduling time was
%.2f seconds, "
+ "likely because of a badly-written trigger. Set
PYTHONASYNCIODEBUG=1 "
+ "to get more information on overrunning coroutines.",
+ median_time,
+ )
+ Stats.incr("triggers.blocked_main_thread")
+
async def arun(self):
"""
Run trigger addition/deletion/cleanup; main (asynchronous) logic
loop.
@@ -568,18 +612,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
"""
while not self.stop:
last_run = time.monotonic()
- await asyncio.sleep(0.1)
- # We allow a generous amount of buffer room for now, since it
might
- # be a busy event loop.
+ await asyncio.sleep(self.WATCHDOG_INTERVAL)
time_elapsed = time.monotonic() - last_run
- if time_elapsed > 0.2:
- self.log.info(
- "Triggerer's async thread was blocked for %.2f seconds,
"
- "likely by a badly-written trigger. Set
PYTHONASYNCIODEBUG=1 "
- "to get more information on overrunning coroutines.",
- time_elapsed,
- )
- Stats.incr("triggers.blocked_main_thread")
+ self.watchdog_stats.append(time_elapsed)
@staticmethod
def set_individual_trigger_logging(trigger):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]