[
https://issues.apache.org/jira/browse/BEAM-3645?focusedWorklogId=266442&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-266442
]
ASF GitHub Bot logged work on BEAM-3645:
----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Jun/19 08:33
Start Date: 25/Jun/19 08:33
Worklog Time Spent: 10m
Work Description: robertwb commented on pull request #8872: [BEAM-3645]
add ParallelBundleManager
URL: https://github.com/apache/beam/pull/8872#discussion_r297055821
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
##########
@@ -181,14 +196,32 @@ def __iter__(self):
windowed_key_values = trigger_driver.process_entire_key
coder_impl = self._post_grouped_coder.get_impl()
key_coder_impl = self._key_coder.get_impl()
- for encoded_key, windowed_values in self._table.items():
+ n = min(n, len(self._table))
+ output_stream_list = []
+ for _ in range(n):
+ output_stream_list.append(create_OutputStream())
+ for idx, (encoded_key, windowed_values) in
enumerate(self._table.items()):
key = key_coder_impl.decode(encoded_key)
for wkvs in windowed_key_values(key, windowed_values):
- coder_impl.encode_to_stream(wkvs, output_stream, True)
- self._grouped_output = [output_stream.get()]
+ coder_impl.encode_to_stream(wkvs, output_stream_list[idx % n], True)
+ for output_stream in output_stream_list:
+ self._grouped_output.append([output_stream.get()])
self._table = None
return iter(self._grouped_output)
+ def __iter__(self):
+ """ Since partition() returns a list of list, added this __iter__ to return
+ a list to simplify code when we need to iterate through ALL elements of
+ _GroupingBuffer.
+ """
+ if len(self._grouped_output) == 0:
+ self.partition(1)
+ iter_result = []
+ for output in self._grouped_output:
+ for out in output:
+ iter_result.append(out)
Review comment:
Lists support + for concatination. Thus you can do
`return iter(sum(self.partition(1), []))`
to sum up the lists into the full list. (The second argument to sum is the
"start" element.) You could also use `itertools.chain(*self.partition(1))`
(which might be cleaner as there's no need to even write iter() in this case).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 266442)
> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Affects Versions: 2.2.0, 2.3.0
> Reporter: Charles Chen
> Assignee: Hannah Jiang
> Priority: Major
> Time Spent: 11.5h
> Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance
> gain over the previous DirectRunner. We can do even better in multi-core
> environments by supporting multi-process execution in the FnApiRunner, to
> scale past Python GIL limitations.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)