iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716312
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
weighted: (optional) if set to True, the combiner produces weighted
quantiles. The input elements are then expected to be tuples of values
with the corresponding weight.
+ batch_input: (optional) if set to True, inputs are expected to be batches
+ of elements.
Review comment:
Yes, here the logic of the Munro-Paterson algorithm is used. Switching
to the calculation from 4.5 would allow to reduce size of the (full)
accumulator. But it's probably out of the scope of this PR, should I leave a
TODO?
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
weighted: (optional) if set to True, the transform returns weighted
quantiles. The input PCollection is then expected to contain tuples of
input values with the corresponding weight.
+ batch_input: (optional) if set to True, the transform expects each
element
+ of input PCollection to be a batch. Provides a way to accumulate
+ multiple elements at a time more efficiently.
"""
- def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+ def __init__(
+ self,
+ num_quantiles,
+ key=None,
+ reverse=False,
+ weighted=False,
+ batch_input=False):
self._num_quantiles = num_quantiles
self._key = key
self._reverse = reverse
self._weighted = weighted
+ self._batch_input = batch_input
def expand(self, pcoll):
return pcoll | CombinePerKey(
ApproximateQuantilesCombineFn.create(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted))
+ weighted=self._weighted,
+ batch_input=self._batch_input))
def display_data(self):
return ApproximateQuantiles._display_data(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted)
+ weighted=self._weighted,
+ batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+ """Quantiles computation specifications."""
+ def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+ # type: (int, int, bool, Any, bool) -> None
+ self.buffer_size = buffer_size
+ self.num_buffers = num_buffers
+ self.weighted = weighted
+ self.key = key
+ self.reverse = reverse
+
+ # Used to sort tuples of values and weights.
+ self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+ # Used to compare values.
+ if reverse and key is None:
+ self.less_than = lambda a, b: a > b
+ elif reverse:
+ self.less_than = lambda a, b: key(a) > key(b)
+ elif key is None:
+ self.less_than = lambda a, b: a < b
+ else:
+ self.less_than = lambda a, b: key(a) < key(b)
+
+ def get_argsort_key(self, elements):
+ # type: (List) -> Any
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]