iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716476
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
self._offset_jitter = 2 - self._offset_jitter
return (new_weight + self._offset_jitter) / 2
- def _collapse(self, buffers):
- # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
- new_level = 0
- new_weight = 0
- for buffer_elem in buffers:
- # As presented in the paper, there should always be at least two
- # buffers of the same (minimal) level to collapse, but it is possible
- # to violate this condition when combining buffers from independently
- # computed shards. If they differ we take the max.
- new_level = max([new_level, buffer_elem.level + 1])
- new_weight = new_weight + buffer_elem.weight
- if self._weighted:
- step = new_weight / (self._buffer_size - 1)
- offset = new_weight / (2 * self._buffer_size)
- else:
- step = new_weight
- offset = self._offset(new_weight)
- new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
- return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
- def _collapse_if_needed(self, qs):
- # type: (_QuantileState) -> None
- while len(qs.buffers) > self._num_buffers:
- to_collapse = []
- to_collapse.append(heapq.heappop(qs.buffers))
- to_collapse.append(heapq.heappop(qs.buffers))
- min_level = to_collapse[1].level
-
- while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
- to_collapse.append(heapq.heappop(qs.buffers))
-
- heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
- def _interpolate(self, i_buffers, count, step, offset):
- """
- Emulates taking the ordered union of all elements in buffers, repeated
- according to their weight, and picking out the (k * step + offset)-th
- elements of this list for `0 <= k < count`.
- """
-
- iterators = []
- new_elements = []
- compare_key = self._key
- if self._key and not self._weighted:
- compare_key = lambda x: self._key(x[0])
- for buffer_elem in i_buffers:
- iterators.append(buffer_elem.sized_iterator())
-
- # Python 3 `heapq.merge` support key comparison and returns an iterator and
- # does not pull the data into memory all at once. Python 2 does not
- # support comparison on its `heapq.merge` api, so we use the itertools
- # which takes the `key` function for comparison and creates an iterator
- # from it.
- if sys.version_info[0] < 3:
- sorted_elem = iter(
- sorted(
- itertools.chain.from_iterable(iterators),
- key=compare_key,
- reverse=self._reverse))
- else:
- sorted_elem = heapq.merge(
- *iterators, key=compare_key, reverse=self._reverse)
-
- weighted_element = next(sorted_elem)
- current = weighted_element[1]
- j = 0
- previous = 0
- while j < count:
- target = j * step + offset
- j = j + 1
- try:
- while current <= target:
- weighted_element = next(sorted_elem)
- current = current + weighted_element[1]
- except StopIteration:
- pass
- if self._weighted:
- new_elements.append((weighted_element[0], current - previous))
- previous = current
- else:
- new_elements.append(weighted_element[0])
- return new_elements
-
# TODO(BEAM-7746): Signature incompatible with supertype
def create_accumulator(self): # type: ignore[override]
- # type: () -> _QuantileState[T]
+ # type: () -> _QuantileState
self._qs = _QuantileState(
- buffer_size=self._buffer_size,
- num_buffers=self._num_buffers,
unbuffered_elements=[],
- buffers=[])
+ unbuffered_weights=[],
+ buffers=[],
+ spec=self._spec)
return self._qs
def add_input(self, quantile_state, element):
"""
Add a new element to the collection being summarized by quantile state.
"""
- value = element[0] if self._weighted else element
- if quantile_state.is_empty():
- quantile_state.min_val = quantile_state.max_val = value
- elif self._comparator(value, quantile_state.min_val) < 0:
- quantile_state.min_val = value
- elif self._comparator(value, quantile_state.max_val) > 0:
- quantile_state.max_val = value
- self._add_unbuffered(quantile_state, elements=[element])
+ quantile_state.add_unbuffered([element], self._offset)
Review comment:
I don't think that this may cause any problems with cythonization or
performance. They will be static methods though, so the only difference is the
namespace and neither of them deals with _QuantileState objects. But I don't
have any strong preference, WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]