dlesco commented on issue #35806:
URL: https://github.com/apache/airflow/issues/35806#issuecomment-1828601815

   I drafted some code, but won't be able to test it today.
   
   ```patch
   diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
   index bb151b32cc..8e1228be7e 100644
   --- a/airflow/jobs/triggerer_job_runner.py
   +++ b/airflow/jobs/triggerer_job_runner.py
   @@ -369,6 +369,8 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
                self.emit_metrics()
                # Idle sleep
                time.sleep(1)
   +            # Check watchdog
   +            self.trigger_runner.check_watchdog()
   
        def load_triggers(self):
            """Query the database for the triggers we're supposed to be running 
and update the runner."""
   @@ -445,6 +447,17 @@ class TriggerRunner(threading.Thread, LoggingMixin):
        # Should-we-stop flag
        stop: bool = False
   
   +    # Intervals between block_watchdog runs
   +    watchdog_stats: deque[float]
   +
   +    # Next time to do watchdog checks
   +    next_watchdog_check_time: float
   +
   +    # Watchdog constants
   +    WATCHDOG_INTERVAL = 0.1
   +    WATCHDOG_ERROR_THRESHOLD = 0.2
   +    WATCHDOG_CHECK_INTERVAL = 10.0
   +
        def __init__(self):
            super().__init__()
            self.triggers = {}
   @@ -459,6 +472,37 @@ class TriggerRunner(threading.Thread, LoggingMixin):
            """Sync entrypoint - just run a run in an async loop."""
            asyncio.run(self.arun())
   
   +    def check_watchdog(self):
   +        """Main thread loop calls this to check on block_watchdog delays."""
   +
   +        now = time.monotonic()
   +        if now < self.next_watchdog_check_time:
   +            return
   +        self.next_watchdog_check_time = now + self.WATCHDOG_CHECK_INTERVAL
   +        stats_len = len(self.watchdog_stats)
   +        if not stats_len:
   +            self.log.error(
   +                "Triggerer's async thread was blocked for %.2f seconds, "
   +                "likely by a badly-written trigger. Set 
PYTHONASYNCIODEBUG=1 "
   +                "to get more information on overrunning coroutines.",
   +                self.WATCHDOG_CHECK_INTERVAL,
   +            )
   +            Stats.incr("triggers.blocked_main_thread")
   +            return
   +        stats = list()
   +        for i in range(stats_len):
   +            stats.append(self.watchdog_stats.popleft())
   +        stats.sort()
   +        median_time = stats[len(stats)//2]
   +        if median_time > self.WATCHDOG_ERROR_THRESHOLD:
   +            self.log.error(
   +                "Triggerer's async thread median task scheduling time was 
%.2f seconds, "
   +                "likely because of a badly-written trigger. Set 
PYTHONASYNCIODEBUG=1 "
   +                "to get more information on overrunning coroutines.",
   +                median_time,
   +            )
   +            Stats.incr("triggers.blocked_main_thread")
   +
        async def arun(self):
            """
            Run trigger addition/deletion/cleanup; main (asynchronous) logic 
loop.
   @@ -568,18 +612,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
            """
            while not self.stop:
                last_run = time.monotonic()
   -            await asyncio.sleep(0.1)
   -            # We allow a generous amount of buffer room for now, since it 
might
   -            # be a busy event loop.
   +            await asyncio.sleep(self.WATCHDOG_INTERVAL)
                time_elapsed = time.monotonic() - last_run
   -            if time_elapsed > 0.2:
   -                self.log.info(
   -                    "Triggerer's async thread was blocked for %.2f seconds, 
"
   -                    "likely by a badly-written trigger. Set 
PYTHONASYNCIODEBUG=1 "
   -                    "to get more information on overrunning coroutines.",
   -                    time_elapsed,
   -                )
   -                Stats.incr("triggers.blocked_main_thread")
   +            self.watchdog_stats.append(time_elapsed)
   
        @staticmethod
        def set_individual_trigger_logging(trigger):
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to