[ 
https://issues.apache.org/jira/browse/BEAM-3645?focusedWorklogId=281090&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-281090
 ]

ASF GitHub Bot logged work on BEAM-3645:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jul/19 15:20
            Start Date: 23/Jul/19 15:20
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on pull request #8979: [BEAM-3645] 
add multiplexing for python FnApiRunner
URL: https://github.com/apache/beam/pull/8979#discussion_r306360537
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##########
 @@ -92,34 +140,56 @@ class 
BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
   _DONE_MARKER = object()
 
   def __init__(self):
-    self._push_queue = queue.Queue()
-    self._futures_by_id = dict()
-    self._read_thread = threading.Thread(
-        name='beam_control_read', target=self._read)
+    self._lock = threading.Lock()
     self._uid_counter = 0
     self._state = self.UNSTARTED_STATE
-    self._lock = threading.Lock()
+    # following self._req_* variables are used for debugging purpose, data is
+    # added only when self._log_req is True.
+    self._req_sent = collections.defaultdict(int)
+    self._req_worker_mapping = {}
+    self._log_req = True if logging.getLevelName(
+        logging.getLogger().getEffectiveLevel()) == 'DEBUG' else False
+    self._conn_handler = ConnectionHandler()
 
   def Control(self, iterator, context):
     with self._lock:
       if self._state == self.DONE_STATE:
         return
       else:
         self._state = self.STARTED_STATE
-    self._inputs = iterator
-    # Note: We only support one client for now.
-    self._read_thread.start()
+
+    worker_id = dict(context.invocation_metadata()).get('worker_id')
+    if not worker_id:
+      raise RuntimeError('All workers communicate through gRPC should have '
+                         'worker_id. Received None.')
+
+    control_conn = self._conn_handler.get_conn_handler_by_id(worker_id)
+    control_conn.set_input(iterator)
+    if not control_conn.read_thread.isAlive():
+      control_conn.read_thread.start()
 
 Review comment:
   Starting the thread belongs in set_input, as the connection handler is the 
owner of the thread. In that case the thread itself could be private as well, 
reducing the public API/surface of ConnectonHandler (generally a good thing). 
   
   (Also, it might be good to make it an error to set input twice, so there's 
no need to see if it's alive.)
   
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 281090)
    Time Spent: 28h 20m  (was: 28h 10m)

> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
>                 Key: BEAM-3645
>                 URL: https://issues.apache.org/jira/browse/BEAM-3645
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Charles Chen
>            Assignee: Hannah Jiang
>            Priority: Major
>             Fix For: 2.15.0
>
>          Time Spent: 28h 20m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to