This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch tpollard/streamasync in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit a58996467542be85ffc1200729bac1b2211b4d41 Author: Tom Pollard <[email protected]> AuthorDate: Wed Oct 2 11:30:10 2019 +0100 Move sched notification poll to loop reader --- src/buildstream/_scheduler/scheduler.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index e90efdb..122ba37 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -196,15 +196,16 @@ class Scheduler(): # Handle unix signals while running self._connect_signals() - # Add notification handler - if self._notify_back_queue: - self.loop.call_later(0.01, self._loop) + # Add notification listener if in subprocess + self._start_listening() # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues self._sched() self.loop.run_forever() + # Stop listening for notifications + self._stop_listening() self.loop.close() # Stop handling unix signals @@ -592,16 +593,17 @@ class Scheduler(): raise ValueError("Unrecognised notification type received") def _loop(self): - assert self._notify_back_queue - # Check for and process new messages - while True: - try: - notification = self._notify_back_queue.get_nowait() - self._notification_handler(notification) - except queue.Empty: - notification = None - break - self.loop.call_later(0.01, self._loop) + while not self._notify_back_queue.empty(): + notification = self._notify_back_queue.get_nowait() + self._notification_handler(notification) + + def _start_listening(self): + if self._notify_back_queue: + self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop) + + def _stop_listening(self): + if self._notify_back_queue: + self.loop.remove_reader(self._notify_back_queue._reader.fileno()) def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing
