iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716312



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
       weighted: (optional) if set to True, the combiner produces weighted
         quantiles. The input elements are then expected to be tuples of values
         with the corresponding weight.
+      batch_input: (optional) if set to True, inputs are expected to be batches
+        of elements.

Review comment:
       Yes, here the logic of the Munro-Paterson algorithm is used. Switching 
to the calculation from 4.5 would allow to reduce size of the (full) 
accumulator. But it's probably out of the scope of this PR, should I leave a 
TODO?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each 
element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to