This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch phil/ui-split-refactor in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit f39f4ef0e541f50911fb695e7848a2e6482f2011 Author: Phil Dawson <[email protected]> AuthorDate: Fri Jul 5 14:21:30 2019 +0100 fixup! Move subprocess machinery into a method --- src/buildstream/_stream.py | 94 +++++++++++----------------------------------- 1 file changed, 22 insertions(+), 72 deletions(-) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index e4f6bc0..1de8364 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -47,46 +47,6 @@ from . import utils, _yaml, _site from . import Scope, Consistency -def _subprocessed(self, *args, **kwargs): - assert self - print("Args: {}".format([*args])) - print("Kwargs: {}".format(list(kwargs.items()))) - assert not self._subprocess - - global notification_count - notification_count = 0 - # TODO use functools to pass arguments to func to make target for subprocess - - # Start subprocessed work - mp_context = mp.get_context(method='spawn') - process_name = "stream-{}".format(func.__name__) - print("launchinglaunching subprocess:", process_name) - print(func.__module__, func.__name__) - import buildstream - try: - assert func is buildstream._stream.Stream.build or func is Stream.build - except AssertionError: - print(func, func.__qualname__, func.__name__, func.__module__, id(func)) - self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) - self._subprocess.start() - - # TODO connect signal handlers - - self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) - - print("Starting loop...") - while not self._subprocess.exitcode: - self._loop() - print("Stopping loop...") - - try: - while True: - notification = self.notification_queue.get() - self._scheduler_notification_handler(notification) - except queue.Empty: - pass - - # Stream() # # This is the main, toplevel calling interface in BuildStream core. @@ -146,15 +106,15 @@ class Stream(): def init(self): self._artifacts = self._context.artifactcache self._sourcecache = self._context.sourcecache - print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build)) - + print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, + id(Stream.build)) def run_in_subprocess(self, func, *args, **kwargs): print("Args: {}".format([*args])) print("Kwargs: {}".format(list(kwargs.items()))) assert not self._subprocess - global notification_count + global notification_count notification_count = 0 # TODO use functools to pass arguments to func to make target for subprocess @@ -162,30 +122,22 @@ class Stream(): mp_context = mp.get_context(method='fork') process_name = "stream-{}".format(func.__name__) print("launchinglaunching subprocess:", process_name) - print(func.__module__, func.__name__) - import buildstream - try: - assert func is buildstream._stream.Stream.build or func is Stream.build - except AssertionError: - print(func, func.__qualname__, func.__name__, func.__module__, id(func)) self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) self._subprocess.start() # TODO connect signal handlers - - self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) - - print("Starting loop...") - while not self._subprocess.exitcode: + while self._subprocess.exitcode is not None: + self._subprocess.join(0.1) self._loop() print("Stopping loop...") - try: - while True: - notification = self.notification_queue.get() - self._scheduler_notification_handler(notification) - except queue.Empty: - pass + # try: + # while True: + # notification = self._notification_queue.get_nowait() + # self._scheduler_notification_handler(notification) + # except queue.Empty: + # print("Finished processing notifications") + # pass # cleanup() # @@ -312,6 +264,7 @@ class Stream(): def build(self, *args, **kwargs): self.run_in_subprocess(self._build, *args, **kwargs) + # build() # # Builds (assembles) elements in the pipeline. @@ -1701,15 +1654,12 @@ class Stream(): # work to a subprocess with the @subprocessed decorator def _loop(self): assert self._notification_queue - - # Check for new messages - try: - notification = self._notification_queue.get(block=True, timeout=0.1) - except queue.Empty: - notification = None - print("queue empty, continuing...") - - # Process new messages - if notification: - print("handling notifications") - self._scheduler_notification_handler(notification) + # Check for and process new messages + while True: + try: + notification = self._notification_queue.get_nowait() + print("handling notifications") + self._scheduler_notification_handler(notification) + except queue.Empty: + notification = None + break
