[
https://issues.apache.org/jira/browse/BEAM-3645?focusedWorklogId=279656&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-279656
]
ASF GitHub Bot logged work on BEAM-3645:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Jul/19 11:12
Start Date: 19/Jul/19 11:12
Worklog Time Spent: 10m
Work Description: robertwb commented on pull request #8979: [BEAM-3645]
add multiplexing for python FnApiRunner
URL: https://github.com/apache/beam/pull/8979#discussion_r305301687
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
##########
@@ -92,34 +92,78 @@ class
BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
_DONE_MARKER = object()
def __init__(self):
- self._push_queue = queue.Queue()
self._futures_by_id = dict()
- self._read_thread = threading.Thread(
- name='beam_control_read', target=self._read)
+ self._read_thread = None
self._uid_counter = 0
self._state = self.UNSTARTED_STATE
self._lock = threading.Lock()
+ self._multi_worker_worker_handler = None
+ self._req_worker_mapping = {}
+ self._req_sent = collections.defaultdict(int)
+ self._req_received = collections.defaultdict(int)
+ self._logging_level = logging.getLevelName(
+ logging.getLogger().getEffectiveLevel())
def Control(self, iterator, context):
with self._lock:
if self._state == self.DONE_STATE:
return
else:
self._state = self.STARTED_STATE
- self._inputs = iterator
+
+ metadata = dict((k, v) for k, v in context.invocation_metadata())
+ worker_id = metadata.get('worker_id')
+ if not worker_id:
+ raise RuntimeError('All workers communicate through gRPC should have '
+ 'worker_id. Received None.')
+
+ # wait until worker_handlers are added to MultiWorkerWorkerHandler
+ while not self._multi_worker_worker_handler:
+ time.sleep(0.5)
+ logging.info('Runner: Waiting for worker handlers are added to '
+ 'MultiWorkerWorkerHandler.')
+
+ worker_handler = self._multi_worker_worker_handler\
+ ._get_worker_handler(worker_id)
+ worker = worker_handler._worker_info
+ worker._set_input(iterator)
# Note: We only support one client for now.
+ self._read_thread = threading.Thread(
+ name='beam_control_read', target=self._read,
+ args=(worker,))
self._read_thread.start()
+
while True:
- to_push = self._push_queue.get()
+ to_push = worker._get_req()
if to_push is self._DONE_MARKER:
return
yield to_push
+ if self._logging_level == 'DEBUG':
+ self._req_sent[to_push.instruction_id] += 1
- def _read(self):
- for data in self._inputs:
- self._futures_by_id.pop(data.instruction_id).set(data)
+ def _read(self, worker):
+ for data in worker._get_input():
+ with self._lock:
+ if data.instruction_id in self._futures_by_id:
+ self._futures_by_id.pop(data.instruction_id).set(data)
+ response_type = data.WhichOneof('response')
+ if response_type == 'process_bundle':
+ worker._change_work_load(-1)
+ if self._logging_level == 'DEBUG':
+ self._req_received[data.instruction_id] += 1
+
+ def _dispatch(self, item, dest_worker_id):
+ if dest_worker_id:
+ worker_handler = self._multi_worker_worker_handler._get_worker_handler(
+ dest_worker_id)
+ worker_handler._worker_info._push_req(item)
Review comment:
Generally leading underscores denote private variables. Preferable to not
reach into them (e.g. _get_worker_handler, _worker_info, _push_req).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 279656)
Time Spent: 24.5h (was: 24h 20m)
> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Affects Versions: 2.2.0, 2.3.0
> Reporter: Charles Chen
> Assignee: Hannah Jiang
> Priority: Major
> Fix For: 2.15.0
>
> Time Spent: 24.5h
> Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance
> gain over the previous DirectRunner. We can do even better in multi-core
> environments by supporting multi-process execution in the FnApiRunner, to
> scale past Python GIL limitations.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)