This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 01b47778a5139a81f00498d994132434f474f1d4
Author: Benjamin Schubert <[email protected]>
AuthorDate: Mon Jan 11 11:48:35 2021 +0000

    job.py: Stop using the queue to send data between the child and parent
    
    This removes the need to have all messages processed in the master
    thread, and instead allows them to be done in any thread.
    
    * _messenger.py:
    
      - Store optional job information in the thread local
        storage and expand the message with it if it is present.
    
      - Make the message handler something global and remove the need to
        have a thread-specific one.
    
      - Have message filter out silenced and LOG messages from jobs
    
    * job.py: Remove the job-specific message handler
---
 src/buildstream/_messenger.py          | 56 ++++++++++++++++++++++++++--------
 src/buildstream/_scheduler/jobs/job.py | 45 ++++-----------------------
 2 files changed, 49 insertions(+), 52 deletions(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 01a8cfd..edb79ec 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,7 @@ from typing import Optional, Callable, Iterator, TextIO
 
 from . import _signals
 from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
 from ._state import State, Task
 
 
@@ -48,6 +48,13 @@ class _TimeData:
         self.start_time: datetime.datetime = start_time
 
 
+class _JobInfo:
+    def __init__(self, action_name: str, element_name: str, element_key: str) 
-> None:
+        self.action_name = action_name
+        self.element_name = element_name
+        self.element_key = element_key
+
+
 # _MessengerLocal
 #
 # Thread local storage for the messenger
@@ -56,13 +63,6 @@ class _MessengerLocal(threading.local):
     def __init__(self) -> None:
         super().__init__()
 
-        # The callback to call when propagating messages
-        #
-        # FIXME: The message handler is currently not strongly typed,
-        #        as it uses a kwarg, we cannot declare it with Callable.
-        #        We can use `Protocol` to strongly type this with python >= 3.8
-        self.message_handler = None
-
         # The open file handle for this task
         self.log_handle: Optional[TextIO] = None
 
@@ -72,6 +72,9 @@ class _MessengerLocal(threading.local):
         # Level of silent messages depth in this task
         self.silence_scope_depth: int = 0
 
+        # Job
+        self.job: Optional[_JobInfo] = None
+
 
 # Messenger()
 #
@@ -97,8 +100,16 @@ class Messenger:
         # Thread local storage
         self._locals: _MessengerLocal = _MessengerLocal()
 
-    def setup_new_action_context(self) -> None:
+        # The callback to call when propagating messages
+        #
+        # FIXME: The message handler is currently not strongly typed,
+        #        as it uses a kwarg, we cannot declare it with Callable.
+        #        We can use `Protocol` to strongly type this with python >= 3.8
+        self._message_handler = None
+
+    def setup_new_action_context(self, action_name: str, element_name: str, 
element_key: str) -> None:
         self._locals.silence_scope_depth = 0
+        self._locals.job = _JobInfo(action_name, element_name, element_key)
 
     # set_message_handler()
     #
@@ -106,7 +117,7 @@ class Messenger:
     # the messenger.
     #
     def set_message_handler(self, handler) -> None:
-        self._locals.message_handler = handler
+        self._message_handler = handler
 
     # set_state()
     #
@@ -140,12 +151,31 @@ class Messenger:
         # If we are recording messages, dump a copy into the open log file.
         self._record_message(message)
 
+        # Always add the log filename automatically
+        message.logfile = self._locals.log_filename
+
+        is_silenced = self._silent_messages()
+        job = self._locals.job
+
+        if job is not None:
+            # Automatically add message information from the job context
+            message.action_name = job.action_name
+            message.task_element_name = job.element_name
+            message.task_element_key = job.element_key
+
+            # Don't forward LOG messages from jobs
+            if message.message_type == MessageType.LOG:
+                return
+
+            # Don't forward JOB messages if they are currently silent
+            if is_silenced and (message.message_type not in 
unconditional_messages):
+                return
+
         # Send it off to the log handler (can be the frontend,
         # or it can be the child task which will propagate
         # to the frontend)
-        assert self._locals.message_handler
-
-        self._locals.message_handler(message, 
is_silenced=self._silent_messages())
+        assert self._message_handler
+        self._message_handler(message, is_silenced=is_silenced)
 
     # status():
     #
diff --git a/src/buildstream/_scheduler/jobs/job.py 
b/src/buildstream/_scheduler/jobs/job.py
index 227d3a2..b6d7e6c 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -31,7 +31,7 @@ import traceback
 # BuildStream toplevel imports
 from ... import utils
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
 from ...types import FastEnum
 from ._job import terminate_thread
 from ..._signals import TerminateException
@@ -360,13 +360,12 @@ class Job:
     def _parent_process_pipe(self):
         while self._pipe_r.poll():
             try:
-                message = self._pipe_r.recv()
+                self._pipe_r.recv()
+                assert False, "No message should be received anymore"
             except EOFError:
                 self._parent_stop_listening()
                 break
 
-            self._messenger.message(message)
-
     # _parent_recv()
     #
     # A callback to handle I/O events from the message
@@ -493,8 +492,9 @@ class ChildJob:
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._pipe_w = pipe_w
-        self._messenger.setup_new_action_context()
-        self._messenger.set_message_handler(self._child_message_handler)
+        self._messenger.setup_new_action_context(
+            self.action_name, self._message_element_name, 
self._message_element_key
+        )
 
         # Time, log and and run the action function
         #
@@ -593,36 +593,3 @@ class ChildJob:
                 return
 
         terminate_thread(self._thread_id)
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
-
-    # _child_message_handler()
-    #
-    # A Context delegate for handling messages, this replaces the
-    # frontend's main message handler in the context of a child task
-    # and performs local logging to the local log file before sending
-    # the message back to the parent process for further propagation.
-    # The related element display key is added to the message for
-    # widget rendering if not already set for an element childjob.
-    #
-    # Args:
-    #    message     (Message): The message to log
-    #    is_silenced (bool)   : Whether messages are silenced
-    #
-    def _child_message_handler(self, message, is_silenced):
-
-        message.action_name = self.action_name
-        message.task_element_name = self._message_element_name
-        message.task_element_key = self._message_element_key
-
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in 
unconditional_messages):
-            return
-
-        # Don't bother propagating these to the frontend
-        if message.message_type == MessageType.LOG:
-            return
-
-        self._pipe_w.send(message)

Reply via email to