This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/streamasync
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d7d056bff1e90dae359d8feb0f287ce0a5f098a8
Author: Tom Pollard <[email protected]>
AuthorDate: Fri Oct 11 10:45:58 2019 +0100

    basic async in stream
---
 src/buildstream/_scheduler/scheduler.py |  1 +
 src/buildstream/_stream.py              | 77 ++++++++++++++++++++++++---------
 2 files changed, 57 insertions(+), 21 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py 
b/src/buildstream/_scheduler/scheduler.py
index 122ba37..0d06500 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -68,6 +68,7 @@ class NotificationType(FastEnum):
     START = "start"
     TASK_GROUPS = "task_groups"
     ELEMENT_TOTALS = "element_totals"
+    FINISH = "finish"
 
 
 # Notification()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 74f7755..f0f6138 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -74,6 +74,7 @@ class Stream():
         self.queues = []             # Queue objects
         self.len_session_elements = None
         self.len_total_elements = None
+        self.loop = None
 
         #
         # Private members
@@ -141,26 +142,37 @@ class Stream():
         self._subprocess = mp_context.Process(target=Stream._subprocess_main, 
args=args,
                                               kwargs=kwargs, name=process_name)
 
+
         self._subprocess.start()
 
+        # We can now launch another async
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.loop)
+        self._start_listening()
+        #raise ValueError("started listening")
+        self.loop.run_forever()
+
+        # Run forever needs to be forcefully stopped, else we never exit the 
statement
+
+        #raise ValueError("run_forever")
         # TODO connect signal handlers with asyncio
-        while self._subprocess.exitcode is None:
+        #while self._subprocess.exitcode is None:
             # check every given time interval on subprocess state
-            self._subprocess.join(0.01)
-            # if no exit code, go back to checking the message queue
-            self._loop()
-
+            #self._subprocess.join(0.01)
+        # Scheduler has stopped running, so safe to still have async here
+        self._stop_listening()
+        #print("closing the loop")
+        #raise ValueError("closing loop")
+        #self.loop.stop()
+        self.loop.close()
+        self.loop = None
         # Set main process back
         utils._reset_main_pid()
 
         # Ensure no more notifcations to process
-        try:
-            while True:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-        except queue.Empty:
-            pass
-
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
 
     # cleanup()
     #
@@ -1456,6 +1468,9 @@ class Stream():
 
         status = self._scheduler.run(self.queues)
 
+        # Scheduler has finished running, send frontend notification
+        self._notify_front(Notification(NotificationType.FINISH))
+
         if status == SchedStatus.ERROR:
             raise StreamError()
         if status == SchedStatus.TERMINATED:
@@ -1774,12 +1789,18 @@ class Stream():
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
+            # If we're looping, stop
+            if self.loop:
+                self.loop.stop()
             # Regenerate the exception here, so we don't have to pickle it
             raise SubprocessException(**notification.exception)
         elif notification.notification_type == NotificationType.START:
             self._session_start_callback()
         elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
             self.len_session_elements, self.len_total_elements = 
notification.element_totals
+        elif notification.notification_type == NotificationType.FINISH:
+            if self.loop:
+                self.loop.stop()
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1797,16 +1818,30 @@ class Stream():
 
     # The code to be run by the Stream's event loop while delegating
     # work to a subprocess with the @subprocessed decorator
-    def _loop(self):
-        assert self._notify_front_queue
+    #def _loop(self):
+        #assert self._notify_front_queue
         # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
+        #while True:
+            #try:
+                #notification = self._notify_front_queue.get_nowait()
+                #self._notification_handler(notification)
+            #except queue.Empty:
+                #notification = None
+                #break
+
+    def _loop(self):
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_front_queue:
+            self.loop.add_reader(self._notify_front_queue._reader.fileno(), 
self._loop)
+
+    def _stop_listening(self):
+        if self._notify_front_queue:
+            self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing

Reply via email to