tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r573234373



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
       self._offset_jitter = 2 - self._offset_jitter
       return (new_weight + self._offset_jitter) / 2
 
-  def _collapse(self, buffers):
-    # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
-    new_level = 0
-    new_weight = 0
-    for buffer_elem in buffers:
-      # As presented in the paper, there should always be at least two
-      # buffers of the same (minimal) level to collapse, but it is possible
-      # to violate this condition when combining buffers from independently
-      # computed shards.  If they differ we take the max.
-      new_level = max([new_level, buffer_elem.level + 1])
-      new_weight = new_weight + buffer_elem.weight
-    if self._weighted:
-      step = new_weight / (self._buffer_size - 1)
-      offset = new_weight / (2 * self._buffer_size)
-    else:
-      step = new_weight
-      offset = self._offset(new_weight)
-    new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
-    return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
-  def _collapse_if_needed(self, qs):
-    # type: (_QuantileState) -> None
-    while len(qs.buffers) > self._num_buffers:
-      to_collapse = []
-      to_collapse.append(heapq.heappop(qs.buffers))
-      to_collapse.append(heapq.heappop(qs.buffers))
-      min_level = to_collapse[1].level
-
-      while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
-        to_collapse.append(heapq.heappop(qs.buffers))
-
-      heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
-  def _interpolate(self, i_buffers, count, step, offset):
-    """
-    Emulates taking the ordered union of all elements in buffers, repeated
-    according to their weight, and picking out the (k * step + offset)-th
-    elements of this list for `0 <= k < count`.
-    """
-
-    iterators = []
-    new_elements = []
-    compare_key = self._key
-    if self._key and not self._weighted:
-      compare_key = lambda x: self._key(x[0])
-    for buffer_elem in i_buffers:
-      iterators.append(buffer_elem.sized_iterator())
-
-    # Python 3 `heapq.merge` support key comparison and returns an iterator and
-    # does not pull the data into memory all at once. Python 2 does not
-    # support comparison on its `heapq.merge` api, so we use the itertools
-    # which takes the `key` function for comparison and creates an iterator
-    # from it.
-    if sys.version_info[0] < 3:
-      sorted_elem = iter(
-          sorted(
-              itertools.chain.from_iterable(iterators),
-              key=compare_key,
-              reverse=self._reverse))
-    else:
-      sorted_elem = heapq.merge(
-          *iterators, key=compare_key, reverse=self._reverse)
-
-    weighted_element = next(sorted_elem)
-    current = weighted_element[1]
-    j = 0
-    previous = 0
-    while j < count:
-      target = j * step + offset
-      j = j + 1
-      try:
-        while current <= target:
-          weighted_element = next(sorted_elem)
-          current = current + weighted_element[1]
-      except StopIteration:
-        pass
-      if self._weighted:
-        new_elements.append((weighted_element[0], current - previous))
-        previous = current
-      else:
-        new_elements.append(weighted_element[0])
-    return new_elements
-
   # TODO(BEAM-7746): Signature incompatible with supertype
   def create_accumulator(self):  # type: ignore[override]
-    # type: () -> _QuantileState[T]
+    # type: () -> _QuantileState
     self._qs = _QuantileState(
-        buffer_size=self._buffer_size,
-        num_buffers=self._num_buffers,
         unbuffered_elements=[],
-        buffers=[])
+        unbuffered_weights=[],
+        buffers=[],
+        spec=self._spec)
     return self._qs
 
   def add_input(self, quantile_state, element):
     """
     Add a new element to the collection being summarized by quantile state.
     """
-    value = element[0] if self._weighted else element
-    if quantile_state.is_empty():
-      quantile_state.min_val = quantile_state.max_val = value
-    elif self._comparator(value, quantile_state.min_val) < 0:
-      quantile_state.min_val = value
-    elif self._comparator(value, quantile_state.max_val) > 0:
-      quantile_state.max_val = value
-    self._add_unbuffered(quantile_state, elements=[element])
+    quantile_state.add_unbuffered([element], self._offset)

Review comment:
       There is no big difference if they are static, but I thought that if 
they were instance methods you perhaps you wouldn't have to pass spec or 
offset_fn. No strong opinion, feel free to leave as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to