pabloem commented on a change in pull request #15753:
URL: https://github.com/apache/beam/pull/15753#discussion_r737778631
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -723,12 +723,33 @@ class Reshuffle(PTransform):
Reshuffle is experimental. No backwards compatibility guarantees.
"""
+ def __init__(self, num_buckets=None):
+ """
+ :param num_buckets: If set, specifies the maximum random keys that would be
+ generated.
+ """
+ self.num_buckets = num_buckets
+
+ valid_buckets = isinstance(num_buckets, int) and num_buckets > 0
+ if not (num_buckets is None or valid_buckets):
+ raise ValueError(
+ 'If `num_buckets` is set, it has to be an '
+ 'integer greater than 0, got %s' % num_buckets)
+
def expand(self, pcoll):
# type: (pvalue.PValue) -> pvalue.PCollection
+ if self.num_buckets:
+ keyed = pcoll | 'AddRandomKeys' >> Map(
+ lambda t: (random.randint(0, self.num_buckets), t)).with_input_types(
Review comment:
can we avoid having two paths?
perhaps set `_DEFAULT_NUM_BUCKETS = 1 << 32` and use `randint` for both?
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -723,12 +723,33 @@ class Reshuffle(PTransform):
Reshuffle is experimental. No backwards compatibility guarantees.
"""
+ def __init__(self, num_buckets=None):
+ """
+ :param num_buckets: If set, specifies the maximum random keys that would be
+ generated.
+ """
+ self.num_buckets = num_buckets
+
+ valid_buckets = isinstance(num_buckets, int) and num_buckets > 0
+ if not (num_buckets is None or valid_buckets):
+ raise ValueError(
+ 'If `num_buckets` is set, it has to be an '
+ 'integer greater than 0, got %s' % num_buckets)
+
def expand(self, pcoll):
# type: (pvalue.PValue) -> pvalue.PCollection
+ if self.num_buckets:
+ keyed = pcoll | 'AddRandomKeys' >> Map(
+ lambda t: (random.randint(0, self.num_buckets), t)).with_input_types(
Review comment:
other than that, LGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]