tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r563427603



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each 
element
+        of input PCollection to be a batch. Provides a way to accumulate

Review comment:
       1. Can you please comment on the structure of a 'batch' here? in 
particular for the weighted case. Consider also adding an example to line 295.
   
   2. Looking at the tests, it seems that for weighted case with batches, we 
expect users to provide  elements and weights as separate lists. From 
API/usability standpoint, what is the rationale on providing weights as a 
separate list in as opposed to augmenting the weight to the element in a tuple, 
which is how elements are represented for non-batched case?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -501,6 +781,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
     weighted: (optional) if set to True, the combiner produces weighted
       quantiles. The input elements are then expected to be tuples of input
       values with the corresponding weight.
+    batch_input: (optional) if set to True, inputs are expected to be batches 
of

Review comment:
       Re: line 766: could you please clarify what is N in the docstring
   
   Also, can you please note that the algorithm referenced in the paper is 
generalized to compute weighted quantiles.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
       weighted: (optional) if set to True, the combiner produces weighted
         quantiles. The input elements are then expected to be tuples of values
         with the corresponding weight.
+      batch_input: (optional) if set to True, inputs are expected to be batches
+        of elements.

Review comment:
       I am not sure how k, b are computed here (see line 872) - it seems that 
b follows the experimental evaluation suggested in sect. 4.3 of the paper, 
which corresponds to a 'Munro-Paterson algorithm', while the experimental 
evaluation for the 'new' algorithm is covered in 4.5. Looking at the Table1, 
the 'new' algorithm has lower values of k, b, perhaps it is worth to reexamine 
this logic.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each 
element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any

Review comment:
       Would this hint work here ? 
   ```
   # type: (List) -> Callable[[int], Any]
   ```

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each 
element
+        of input PCollection to be a batch. Provides a way to accumulate

Review comment:
       wording suggestion: s/batch_input/input_batched  or inputs_batched, 
since the parameter refers to the input rather than the result (like in case of 
reverse).

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation 
of'

Review comment:
       Looks like there is already a binary version: 
https://pypi.org/project/mmh3-binary/, with a somewhat recent release (Apr 
2020, but only 3.6 wheels: https://pypi.org/project/mmh3-binary/#files).

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation 
of'

Review comment:
       One downside is that mmh3 has only source release, and does not release 
wheel files. Installing mmh3 requires certain c++ compiler/headers dependencies 
be present on the machine. It appears that the project is no longer maintained. 
I tried to contact the maintainer and did not receive a response... Note that 
sklearn has also implemented a python wrapper for murmurhash: 
https://scikit-learn.org/stable/modules/generated/sklearn.utils.murmurhash3_32.html.
 We could likewise incorporate murmurhash into Beam codebase, make a 
(maintainable) fork of mmh3 and release wheel files, use sklearn's 
implementation, or try to explore a different library for our hashing needs. 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -452,15 +511,236 @@ def __init__(self, buffer_size, num_buffers, 
unbuffered_elements, buffers):
     # into new, full buffers and then take them into account when computing the
     # final output.
     self.unbuffered_elements = unbuffered_elements
+    self.unbuffered_weights = unbuffered_weights
+
+  def __reduce__(self):

Review comment:
       For my education, why was this required? Is there some internal state 
that gets in the way of pickling? Also, could you please add a comment?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each 
element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):

Review comment:
       Please comment that in non-weighted case weights stores a single element 
- the weight of the buffer in the sense of the algorithm. In the generalized 
(weighted) case, it stores weights of individual elements.

##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       I have some concerns about test coverage of  ApproximateQuantiles.
   
   1. Do any of the tests exercise merging of accumulators?
   2. Do any of the tests exercise collapsing of multiple buffers, including 
buffers with same & different weights?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -523,29 +805,25 @@ def __init__(
       num_buffers,  # type: int
       key=None,
       reverse=False,
-      weighted=False):
-    def _comparator(a, b):
-      if key:
-        a, b = key(a), key(b)
-
-      retval = int(a > b) - int(a < b)
-
-      if reverse:
-        return -retval
-
-      return retval
-
-    self._comparator = _comparator
-
+      weighted=False,
+      batch_input=False):
     self._num_quantiles = num_quantiles
-    self._buffer_size = buffer_size
-    self._num_buffers = num_buffers
-    if weighted:
-      self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
-    else:
-      self._key = key
-    self._reverse = reverse
-    self._weighted = weighted
+    self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key, 
reverse)
+    self._batch_input = batch_input
+    if self._batch_input:
+      setattr(self, 'add_input', self._add_inputs)

Review comment:
       Nit: `self.add_input = self._add_inputs` may be easier to read.
   

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
       self._offset_jitter = 2 - self._offset_jitter
       return (new_weight + self._offset_jitter) / 2
 
-  def _collapse(self, buffers):
-    # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
-    new_level = 0
-    new_weight = 0
-    for buffer_elem in buffers:
-      # As presented in the paper, there should always be at least two
-      # buffers of the same (minimal) level to collapse, but it is possible
-      # to violate this condition when combining buffers from independently
-      # computed shards.  If they differ we take the max.
-      new_level = max([new_level, buffer_elem.level + 1])
-      new_weight = new_weight + buffer_elem.weight
-    if self._weighted:
-      step = new_weight / (self._buffer_size - 1)
-      offset = new_weight / (2 * self._buffer_size)
-    else:
-      step = new_weight
-      offset = self._offset(new_weight)
-    new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
-    return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
-  def _collapse_if_needed(self, qs):
-    # type: (_QuantileState) -> None
-    while len(qs.buffers) > self._num_buffers:
-      to_collapse = []
-      to_collapse.append(heapq.heappop(qs.buffers))
-      to_collapse.append(heapq.heappop(qs.buffers))
-      min_level = to_collapse[1].level
-
-      while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
-        to_collapse.append(heapq.heappop(qs.buffers))
-
-      heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
-  def _interpolate(self, i_buffers, count, step, offset):
-    """
-    Emulates taking the ordered union of all elements in buffers, repeated
-    according to their weight, and picking out the (k * step + offset)-th
-    elements of this list for `0 <= k < count`.
-    """
-
-    iterators = []
-    new_elements = []
-    compare_key = self._key
-    if self._key and not self._weighted:
-      compare_key = lambda x: self._key(x[0])
-    for buffer_elem in i_buffers:
-      iterators.append(buffer_elem.sized_iterator())
-
-    # Python 3 `heapq.merge` support key comparison and returns an iterator and
-    # does not pull the data into memory all at once. Python 2 does not
-    # support comparison on its `heapq.merge` api, so we use the itertools
-    # which takes the `key` function for comparison and creates an iterator
-    # from it.
-    if sys.version_info[0] < 3:
-      sorted_elem = iter(
-          sorted(
-              itertools.chain.from_iterable(iterators),
-              key=compare_key,
-              reverse=self._reverse))
-    else:
-      sorted_elem = heapq.merge(
-          *iterators, key=compare_key, reverse=self._reverse)
-
-    weighted_element = next(sorted_elem)
-    current = weighted_element[1]
-    j = 0
-    previous = 0
-    while j < count:
-      target = j * step + offset
-      j = j + 1
-      try:
-        while current <= target:
-          weighted_element = next(sorted_elem)
-          current = current + weighted_element[1]
-      except StopIteration:
-        pass
-      if self._weighted:
-        new_elements.append((weighted_element[0], current - previous))
-        previous = current
-      else:
-        new_elements.append(weighted_element[0])
-    return new_elements
-
   # TODO(BEAM-7746): Signature incompatible with supertype
   def create_accumulator(self):  # type: ignore[override]
-    # type: () -> _QuantileState[T]
+    # type: () -> _QuantileState
     self._qs = _QuantileState(
-        buffer_size=self._buffer_size,
-        num_buffers=self._num_buffers,
         unbuffered_elements=[],
-        buffers=[])
+        unbuffered_weights=[],
+        buffers=[],
+        spec=self._spec)
     return self._qs
 
   def add_input(self, quantile_state, element):
     """
     Add a new element to the collection being summarized by quantile state.
     """
-    value = element[0] if self._weighted else element
-    if quantile_state.is_empty():
-      quantile_state.min_val = quantile_state.max_val = value
-    elif self._comparator(value, quantile_state.min_val) < 0:
-      quantile_state.min_val = value
-    elif self._comparator(value, quantile_state.max_val) > 0:
-      quantile_state.max_val = value
-    self._add_unbuffered(quantile_state, elements=[element])
+    quantile_state.add_unbuffered([element], self._offset)

Review comment:
       would it make sense to make _collapse, _interpolate, _offset be methods 
of _QuantileState class ? Would that impact cythonization/performance? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to