[ 
https://issues.apache.org/jira/browse/BEAM-3645?focusedWorklogId=282587&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-282587
 ]

ASF GitHub Bot logged work on BEAM-3645:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jul/19 10:06
            Start Date: 25/Jul/19 10:06
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on pull request #8979: [BEAM-3645] 
add multiplexing for python FnApiRunner
URL: https://github.com/apache/beam/pull/8979#discussion_r307217109
 
 

 ##########
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##########
 @@ -69,22 +69,78 @@ def close(self):
       self._close_callback(self.get())
 
 
-class DataChannel(with_metaclass(abc.ABCMeta, object)):
-  """Represents a channel for reading and writing data over the data plane.
+class DataChannel(object):
+
+  def __init__(self):
+    self.data_conn = DataChannelConnection()
+
+
+class InMemoryDataChannel(DataChannel):
+  """An in-memory implementation of a DataChannel.
+
+  This channel is two-sided.  What is written to one side is read by the other.
+  The inverse() method returns the other side of an instance.
+  """
+
+  def __init__(self, inverse=None, data_conn=None):
+    self.data_conn = data_conn or InMemoryDataChannelConnection()
+    self._inverse = inverse or InMemoryDataChannel(
+        self, self.data_conn.inverse())
+
+  def inverse(self):
+    return self._inverse
 
-  Read from this channel with the input_elements method::
+
+class GrpcClientDataChannel(DataChannel):
+  """A DataChannel wrapping the client side of a BeamFnData connection."""
+
+  def __init__(self, data_stub):
+    self.data_conn = GrpcDataChannelConnection()
+    self.data_conn._start_reader(data_stub.Data(
+        self.data_conn._write_outputs()))
+
+
+class GrpcServerDataChannel(
+    beam_fn_api_pb2_grpc.BeamFnDataServicer, DataChannel):
+  """A DataChannel wrapping the server side of a BeamFnData connection."""
+
+  def __init__(self):
+    self.data_conn = GrpcDataChannelConnection()
 
 Review comment:
   I'm not following this--nothing has connected to the service at this point. 
   
   I'm sorry I wasn't clear enough (this kind of thing is much more easily 
explained in person at a whiteboard...) Here the notion of Channel is already 
equivalent to Connection. 
   
   If you look at 
https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/data_plane.py#L300
 we have GrpcServerDataChannel inheriting from both 
beam_fn_api_pb2_grpc.BeamFnDataServicer and _GrpcDataChannel, because these 
used to be in a 1:1 relationship. What we need to do is split them apart, such 
that the server class `GrpcServerDataChannel` (rename to BeamFnDataServicer or 
similar) holds a [default]dict of `_GrpcDataChannel`s indexed by worker id, and 
does the appropriate thread starting on each call to `Data()`. Consumer would 
then go from the data servicer to the channel associated with the worker they 
care about, rather than assume they are one and the same.
   
   I think this should be a much smaller change. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 282587)
    Time Spent: 30h 50m  (was: 30h 40m)

> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
>                 Key: BEAM-3645
>                 URL: https://issues.apache.org/jira/browse/BEAM-3645
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Charles Chen
>            Assignee: Hannah Jiang
>            Priority: Major
>             Fix For: 2.15.0
>
>          Time Spent: 30h 50m
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to