[ 
https://issues.apache.org/jira/browse/BEAM-3645?focusedWorklogId=279933&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-279933
 ]

ASF GitHub Bot logged work on BEAM-3645:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jul/19 19:59
            Start Date: 19/Jul/19 19:59
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on pull request #8979: [BEAM-3645] 
add multiplexing for python FnApiRunner
URL: https://github.com/apache/beam/pull/8979#discussion_r305505143
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##########
 @@ -1480,42 +1664,54 @@ def process_bundle(self, inputs, expected_outputs):
 
     # Register the bundle descriptor, if needed - noop if already registered.
     registration_future = self._register_bundle_descriptor()
+    # Check that the bundle was successfully registered.
+    if registration_future and registration_future.get().error:
+      raise RuntimeError(registration_future.get().error)
 
     split_manager = self._select_split_manager()
-    if not split_manager:
-      # If there is no split_manager, write all input data to the channel.
+    if not split_manager and isinstance(self._worker_handler,
+                                        EmbeddedWorkerHandler):
+      # If there is no split_manager, write all input data to the channel. For
+      # EmbeddedWorkerHandler, send data first, then send request.
       for transform_id, elements in inputs.items():
         self._send_input_to_worker(
             process_bundle_id, transform_id, elements)
 
-    # Check that the bundle was successfully registered.
-    if registration_future and registration_future.get().error:
-      raise RuntimeError(registration_future.get().error)
-
     # Actually start the bundle.
     process_bundle_req = beam_fn_api_pb2.InstructionRequest(
         instruction_id=process_bundle_id,
         process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
             process_bundle_descriptor_reference=self._bundle_descriptor.id))
-    result_future = self._controller.control_handler.push(process_bundle_req)
+    result_future = self._worker_handler.control_handler.push(
+        process_bundle_req, self._worker_handler._worker_id)
+
+    if not split_manager and isinstance(self._worker_handler,
 
 Review comment:
   Using the ideas above, we should avoid the need to send the request before 
the data in the GRPC case. (It is true that split_manager is incompatible with 
EmbeddedWorkerHandler, wouldn't hurt to assert this.)
   
   (Nit: it's not clear that `isinstance(GrpcWorkerHandler)` and 
`isinstance(EmbeddedWorkerHandler)` cover both cases, e.g. someone could add a 
third type without knowing they had to fix the logic here. If we continued with 
this, it'd be better to do `isinstance(GrpcWorkerHandler)` vs. `not 
isinstance(GrpcWorkerHandler)`)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 279933)
    Time Spent: 27h  (was: 26h 50m)

> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
>                 Key: BEAM-3645
>                 URL: https://issues.apache.org/jira/browse/BEAM-3645
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Charles Chen
>            Assignee: Hannah Jiang
>            Priority: Major
>             Fix For: 2.15.0
>
>          Time Spent: 27h
>  Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance 
> gain over the previous DirectRunner.  We can do even better in multi-core 
> environments by supporting multi-process execution in the FnApiRunner, to 
> scale past Python GIL limitations.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to