[
https://issues.apache.org/jira/browse/BEAM-3645?focusedWorklogId=266431&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-266431
]
ASF GitHub Bot logged work on BEAM-3645:
----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Jun/19 08:33
Start Date: 25/Jun/19 08:33
Worklog Time Spent: 10m
Work Description: robertwb commented on pull request #8872: [BEAM-3645]
add ParallelBundleManager
URL: https://github.com/apache/beam/pull/8872#discussion_r297063791
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
##########
@@ -1506,6 +1545,52 @@ def process_bundle(self, inputs, expected_outputs):
return result, split_results
+class ParallelBundleManager(BundleManager):
+
+ def _check_inputs_split(self, expected_outputs):
+ # We skip splitting inputs when timer is set, because operations are not
+ # triggered until we sent inputs for timers.
+ for _, pcoll_id in expected_outputs.items():
+ kind = split_buffer_id(pcoll_id)[0]
+ if kind in ['timers']:
+ return False
+
+ return True
+
+ def process_bundle(self, inputs, expected_outputs, num_workers=None):
+ num_workers = num_workers or self._num_workers
+ param_list = []
+
+ if self._check_inputs_split(expected_outputs):
+ for name, input in inputs.items():
+ for part in input.partition(num_workers):
+ param_list.append(({name : part}, expected_outputs))
Review comment:
OK, I think this is where the bug above is. Timers are a case where there is
more than one set of inputs, but this here adds a new element to param list for
every input for every part.
Instead, I think you need something like
```
partitioned_inputs = [{} for _ in range(num_workers)]
for name, input in inputs.items():
for ix, part in input.partition(num_workers):
partitioned_inputs[ix][name] = part
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 266431)
Time Spent: 10h (was: 9h 50m)
> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Affects Versions: 2.2.0, 2.3.0
> Reporter: Charles Chen
> Assignee: Hannah Jiang
> Priority: Major
> Time Spent: 10h
> Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance
> gain over the previous DirectRunner. We can do even better in multi-core
> environments by supporting multi-process execution in the FnApiRunner, to
> scale past Python GIL limitations.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)