This is an automated email from the ASF dual-hosted git repository.
udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a4abc91 [BEAM-12434] Implement side_input for num_shards in iobase
(#14916)
a4abc91 is described below
commit a4abc91436eadf78abda304664a3e0f64019ea61
Author: hoshimura <[email protected]>
AuthorDate: Fri Jun 4 19:19:20 2021 +0200
[BEAM-12434] Implement side_input for num_shards in iobase (#14916)
Co-authored-by: Udi Meiri <[email protected]>
Co-authored-by: Johan Sternby <[email protected]>
---
sdks/python/apache_beam/io/iobase.py | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/io/iobase.py
b/sdks/python/apache_beam/io/iobase.py
index 71d8037..5d8e5df 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1125,7 +1125,7 @@ class WriteImpl(ptransform.PTransform):
if min_shards == 1:
keyed_pcoll = pcoll | core.Map(lambda x: (None, x))
else:
- keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(min_shards))
+ keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(), count=min_shards)
write_result_coll = (
keyed_pcoll
| core.WindowInto(window.GlobalWindows())
@@ -1226,17 +1226,13 @@ def _finalize_write(
class _RoundRobinKeyFn(core.DoFn):
- def __init__(self, count):
- # type: (int) -> None
- self.count = count
-
def start_bundle(self):
- self.counter = random.randint(0, self.count - 1)
+ self.counter = None
- def process(self, element):
- self.counter += 1
- if self.counter >= self.count:
- self.counter -= self.count
+ def process(self, element, count):
+ if self.counter is None:
+ self.counter = random.randrange(0, count)
+ self.counter = (1 + self.counter) % count
yield self.counter, element