This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 66a8a4c Adding performance improvements to ApproximateQuantiles.
(#13175)
66a8a4c is described below
commit 66a8a4c5e97f674f594c2194da736ad63a842dfe
Author: Ihor Indyk <[email protected]>
AuthorDate: Fri Feb 19 01:50:04 2021 -0500
Adding performance improvements to ApproximateQuantiles. (#13175)
---
sdks/python/apache_beam/transforms/stats.pxd | 60 ++
sdks/python/apache_beam/transforms/stats.py | 719 ++++++++++++++---------
sdks/python/apache_beam/transforms/stats_test.py | 114 +++-
sdks/python/setup.py | 1 +
4 files changed, 610 insertions(+), 284 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/stats.pxd
b/sdks/python/apache_beam/transforms/stats.pxd
new file mode 100644
index 0000000..e67c012
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/stats.pxd
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cimport cython
+from libc.stdint cimport int64_t
+
+cdef class _QuantileSpec(object):
+ cdef readonly int64_t buffer_size
+ cdef readonly int64_t num_buffers
+ cdef readonly bint weighted
+ cdef readonly key
+ cdef readonly bint reverse
+ cdef readonly weighted_key
+ cdef readonly less_than
+
+cdef class _QuantileBuffer(object):
+ cdef readonly elements
+ cdef readonly weights
+ cdef readonly bint weighted
+ cdef readonly int64_t level
+ cdef readonly min_val
+ cdef readonly max_val
+ cdef readonly _iter
+
+cdef class _QuantileState(object):
+ cdef readonly _QuantileSpec spec
+ cdef public buffers
+ cdef public unbuffered_elements
+ cdef public unbuffered_weights
+ cdef public add_unbuffered
+ cpdef bint is_empty(self)
+ @cython.locals(num_new_buffers=int64_t, idx=int64_t)
+ cpdef _add_unbuffered(self, elements, offset_fn)
+ @cython.locals(num_new_buffers=int64_t, idx=int64_t)
+ cpdef _add_unbuffered_weighted(self, elements, offset_fn)
+ cpdef finalize(self)
+ @cython.locals(min_level=int64_t)
+ cpdef collapse_if_needed(self, offset_fn)
+
+
[email protected](new_level=int64_t, new_weight=double, step=double,
offset=double)
+cdef _QuantileBuffer _collapse(buffers, offset_fn, _QuantileSpec spec)
+
[email protected](j=int64_t)
+cdef _interpolate(buffers, int64_t count, double step, double offset,
+ _QuantileSpec spec)
\ No newline at end of file
diff --git a/sdks/python/apache_beam/transforms/stats.py
b/sdks/python/apache_beam/transforms/stats.py
index 9d19bc4..bd6dc72 100644
--- a/sdks/python/apache_beam/transforms/stats.py
+++ b/sdks/python/apache_beam/transforms/stats.py
@@ -36,14 +36,12 @@ import heapq
import itertools
import logging
import math
-import sys
import typing
from builtins import round
from typing import Any
-from typing import Generic
-from typing import Iterable
+from typing import Callable
from typing import List
-from typing import Sequence
+from typing import Tuple
from apache_beam import coders
from apache_beam import typehints
@@ -61,30 +59,34 @@ T = typing.TypeVar('T')
K = typing.TypeVar('K')
V = typing.TypeVar('V')
+try:
+ import mmh3 # pylint: disable=import-error
-def _get_default_hash_fn():
- """Returns either murmurhash or md5 based on installation."""
- try:
- import mmh3 # pylint: disable=import-error
+ def _mmh3_hash(value):
+ # mmh3.hash64 returns two 64-bit unsigned integers
+ return mmh3.hash64(value, seed=0, signed=False)[0]
+
+ _default_hash_fn = _mmh3_hash
+ _default_hash_fn_type = 'mmh3'
+except ImportError:
+
+ def _md5_hash(value):
+ # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+ # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+ # integer fingerprint.
+ return int(hashlib.md5(value).hexdigest()[:16], 16)
- def _mmh3_hash(value):
- # mmh3.hash64 returns two 64-bit unsigned integers
- return mmh3.hash64(value, seed=0, signed=False)[0]
+ _default_hash_fn = _md5_hash
+ _default_hash_fn_type = 'md5'
- return _mmh3_hash
- except ImportError:
+def _get_default_hash_fn():
+ """Returns either murmurhash or md5 based on installation."""
+ if _default_hash_fn_type == 'md5':
logging.warning(
'Couldn\'t find murmurhash. Install mmh3 for a faster implementation
of'
'ApproximateUnique.')
-
- def _md5_hash(value):
- # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
- # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
- # integer fingerprint.
- return int(hashlib.md5(value).hexdigest()[:16], 16)
-
- return _md5_hash
+ return _default_hash_fn
class ApproximateUnique(object):
@@ -297,9 +299,20 @@ class ApproximateQuantiles(object):
weighted=True
out: [0, 2, 5, 7, 100]
+
+ in: [list(range(10)), ..., list(range(90, 101))], num_quantiles=5,
+ input_batched=True
+
+ out: [0, 25, 50, 75, 100]
+
+ in: [(list(range(10)), [1]*10), (list(range(10)), [0]*10), ...,
+ (list(range(90, 101)), [0]*11)], num_quantiles=5, input_batched=True,
+ weighted=True
+
+ out: [0, 2, 5, 7, 100]
"""
@staticmethod
- def _display_data(num_quantiles, key, reverse, weighted):
+ def _display_data(num_quantiles, key, reverse, weighted, input_batched):
return {
'num_quantiles': DisplayDataItem(num_quantiles, label='Quantile
Count'),
'key': DisplayDataItem(
@@ -307,7 +320,9 @@ class ApproximateQuantiles(object):
if hasattr(key, '__name__') else key.__class__.__name__,
label='Record Comparer Key'),
'reverse': DisplayDataItem(str(reverse), label='Is Reversed'),
- 'weighted': DisplayDataItem(str(weighted), label='Is Weighted')
+ 'weighted': DisplayDataItem(str(weighted), label='Is Weighted'),
+ 'input_batched': DisplayDataItem(
+ str(input_batched), label='Is Input Batched'),
}
@typehints.with_input_types(
@@ -327,12 +342,24 @@ class ApproximateQuantiles(object):
weighted: (optional) if set to True, the transform returns weighted
quantiles. The input PCollection is then expected to contain tuples of
input values with the corresponding weight.
+ input_batched: (optional) if set to True, the transform expects each
+ element of input PCollection to be a batch, which is a list of elements
+ for non-weighted case and a tuple of lists of elements and weights for
+ weighted. Provides a way to accumulate multiple elements at a time more
+ efficiently.
"""
- def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+ def __init__(
+ self,
+ num_quantiles,
+ key=None,
+ reverse=False,
+ weighted=False,
+ input_batched=False):
self._num_quantiles = num_quantiles
self._key = key
self._reverse = reverse
self._weighted = weighted
+ self._input_batched = input_batched
def expand(self, pcoll):
return pcoll | CombineGlobally(
@@ -340,14 +367,16 @@ class ApproximateQuantiles(object):
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted))
+ weighted=self._weighted,
+ input_batched=self._input_batched))
def display_data(self):
return ApproximateQuantiles._display_data(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted)
+ weighted=self._weighted,
+ input_batched=self._input_batched)
@typehints.with_input_types(
typehints.Union[typing.Tuple[K, V],
@@ -368,12 +397,24 @@ class ApproximateQuantiles(object):
weighted: (optional) if set to True, the transform returns weighted
quantiles. The input PCollection is then expected to contain tuples of
input values with the corresponding weight.
+ input_batched: (optional) if set to True, the transform expects each
+ element of input PCollection to be a batch, which is a list of elements
+ for non-weighted case and a tuple of lists of elements and weights for
+ weighted. Provides a way to accumulate multiple elements at a time more
+ efficiently.
"""
- def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+ def __init__(
+ self,
+ num_quantiles,
+ key=None,
+ reverse=False,
+ weighted=False,
+ input_batched=False):
self._num_quantiles = num_quantiles
self._key = key
self._reverse = reverse
self._weighted = weighted
+ self._input_batched = input_batched
def expand(self, pcoll):
return pcoll | CombinePerKey(
@@ -381,69 +422,106 @@ class ApproximateQuantiles(object):
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted))
+ weighted=self._weighted,
+ input_batched=self._input_batched))
def display_data(self):
return ApproximateQuantiles._display_data(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted)
+ weighted=self._weighted,
+ input_batched=self._input_batched)
-class _QuantileBuffer(Generic[T]):
+class _QuantileSpec(object):
+ """Quantiles computation specifications."""
+ def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+ # type: (int, int, bool, Any, bool) -> None
+ self.buffer_size = buffer_size
+ self.num_buffers = num_buffers
+ self.weighted = weighted
+ self.key = key
+ self.reverse = reverse
+
+ # Used to sort tuples of values and weights.
+ self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+ # Used to compare values.
+ if reverse and key is None:
+ self.less_than = lambda a, b: a > b
+ elif reverse:
+ self.less_than = lambda a, b: key(a) > key(b)
+ elif key is None:
+ self.less_than = lambda a, b: a < b
+ else:
+ self.less_than = lambda a, b: key(a) < key(b)
+
+ def get_argsort_key(self, elements):
+ # type: (List) -> Callable[[int], Any]
+
+ """Returns a key for sorting indices of elements by element's value."""
+ if self.key is None:
+ return elements.__getitem__
+ else:
+ return lambda idx: self.key(elements[idx])
+
+ def __reduce__(self):
+ return (
+ self.__class__,
+ (
+ self.buffer_size,
+ self.num_buffers,
+ self.weighted,
+ self.key,
+ self.reverse))
+
+
+class _QuantileBuffer(object):
"""A single buffer in the sense of the referenced algorithm.
(see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
&type=pdf and ApproximateQuantilesCombineFn for further information)"""
- def __init__(self, elements, weighted, level=0, weight=1):
- # type: (Sequence[T], bool, int, int) -> None
- # In case of weighted quantiles, elements are tuples of values and weights.
+ def __init__(
+ self, elements, weights, weighted, level=0, min_val=None, max_val=None):
+ # type: (List, List, bool, int, Any, Any) -> None
self.elements = elements
+ # In non-weighted case weights contains a single element representing
weight
+ # of the buffer in the sense of the original algorithm. In weighted case,
+ # it stores weights of individual elements.
+ self.weights = weights
self.weighted = weighted
self.level = level
- self.weight = weight
-
- def __lt__(self, other):
- if self.weighted:
- return [element[0] for element in self.elements
- ] < [element[0] for element in other.elements]
+ if min_val is None or max_val is None:
+ # Buffer is always initialized with sorted elements.
+ self.min_val = elements[0]
+ self.max_val = elements[-1]
else:
- return self.elements < other.elements
-
- def sized_iterator(self):
- class QuantileBufferIterator(object):
- def __init__(self, elem, weighted, weight):
- self._iter = iter(elem)
- self.weighted = weighted
- self.weight = weight
-
- def __iter__(self):
- return self
+ # Note that collapsed buffer may not contain min and max in the list of
+ # elements.
+ self.min_val = min_val
+ self.max_val = max_val
- def __next__(self):
- if self.weighted:
- return next(self._iter)
- else:
- value = next(self._iter)
- return (value, self.weight)
+ def __iter__(self):
+ return zip(
+ self.elements,
+ self.weights if self.weighted else itertools.repeat(self.weights[0]))
- next = __next__ # For Python 2
-
- return QuantileBufferIterator(self.elements, self.weighted, self.weight)
+ def __lt__(self, other):
+ return self.level < other.level
-class _QuantileState(Generic[T]):
+class _QuantileState(object):
"""
Compact summarization of a collection on which quantiles can be estimated.
"""
- min_val = None # type: Any # Holds smallest item in the list
- max_val = None # type: Any # Holds largest item in the list
-
- def __init__(self, buffer_size, num_buffers, unbuffered_elements, buffers):
- # type: (int, int, List[Any], List[_QuantileBuffer[T]]) -> None
- self.buffer_size = buffer_size
- self.num_buffers = num_buffers
+ def __init__(self, unbuffered_elements, unbuffered_weights, buffers, spec):
+ # type: (List, List, List[_QuantileBuffer], _QuantileSpec) -> None
self.buffers = buffers
+ self.spec = spec
+ if spec.weighted:
+ self.add_unbuffered = self._add_unbuffered_weighted
+ else:
+ self.add_unbuffered = self._add_unbuffered
# The algorithm requires that the manipulated buffers always be filled to
# capacity to perform the collapse operation. This operation can be
extended
@@ -452,6 +530,17 @@ class _QuantileState(Generic[T]):
# into new, full buffers and then take them into account when computing the
# final output.
self.unbuffered_elements = unbuffered_elements
+ self.unbuffered_weights = unbuffered_weights
+
+ # This is needed for pickling to work when Cythonization is enabled.
+ def __reduce__(self):
+ return (
+ self.__class__,
+ (
+ self.unbuffered_elements,
+ self.unbuffered_weights,
+ self.buffers,
+ self.spec))
def is_empty(self):
# type: () -> bool
@@ -459,8 +548,219 @@ class _QuantileState(Generic[T]):
"""Check if the buffered & unbuffered elements are empty or not."""
return not self.unbuffered_elements and not self.buffers
+ def _add_unbuffered(self, elements, offset_fn):
+ # type: (List, Any) -> None
+
+ """
+ Add elements to the unbuffered list, creating new buffers and
+ collapsing if needed.
+ """
+ self.unbuffered_elements.extend(elements)
+ num_new_buffers = len(self.unbuffered_elements) // self.spec.buffer_size
+ for idx in range(num_new_buffers):
+ to_buffer = sorted(
+ self.unbuffered_elements[idx * self.spec.buffer_size:(idx + 1) *
+ self.spec.buffer_size],
+ key=self.spec.key,
+ reverse=self.spec.reverse)
+ heapq.heappush(
+ self.buffers,
+ _QuantileBuffer(elements=to_buffer, weights=[1], weighted=False))
+
+ if num_new_buffers > 0:
+ self.unbuffered_elements = self.unbuffered_elements[num_new_buffers *
+ self.spec.
+ buffer_size:]
+
+ self.collapse_if_needed(offset_fn)
+
+ def _add_unbuffered_weighted(self, elements, offset_fn):
+ # type: (List, Any) -> None
+
+ """
+ Add elements with weights to the unbuffered list, creating new buffers and
+ collapsing if needed.
+ """
+ if len(elements) == 1:
+ self.unbuffered_elements.append(elements[0][0])
+ self.unbuffered_weights.append(elements[0][1])
+ else:
+ self.unbuffered_elements.extend(elements[0])
+ self.unbuffered_weights.extend(elements[1])
+ num_new_buffers = len(self.unbuffered_elements) // self.spec.buffer_size
+ argsort_key = self.spec.get_argsort_key(self.unbuffered_elements)
+ for idx in range(num_new_buffers):
+ argsort = sorted(
+ range(idx * self.spec.buffer_size, (idx + 1) *
self.spec.buffer_size),
+ key=argsort_key,
+ reverse=self.spec.reverse)
+ elements_to_buffer = [self.unbuffered_elements[idx] for idx in argsort]
+ weights_to_buffer = [self.unbuffered_weights[idx] for idx in argsort]
+ heapq.heappush(
+ self.buffers,
+ _QuantileBuffer(
+ elements=elements_to_buffer,
+ weights=weights_to_buffer,
+ weighted=True))
+
+ if num_new_buffers > 0:
+ self.unbuffered_elements = self.unbuffered_elements[num_new_buffers *
+ self.spec.
+ buffer_size:]
+ self.unbuffered_weights = self.unbuffered_weights[num_new_buffers *
+ self.spec.buffer_size:]
+
+ self.collapse_if_needed(offset_fn)
+
+ def finalize(self):
+ # type: () -> None
+
+ """
+ Creates a new buffer using all unbuffered elements. Called before
+ extracting an output. Note that the buffer doesn't have to be put in a
+ proper position since _collapse is not going to be called after.
+ """
+ if self.unbuffered_elements and self.spec.weighted:
+ argsort_key = self.spec.get_argsort_key(self.unbuffered_elements)
+ argsort = sorted(
+ range(len(self.unbuffered_elements)),
+ key=argsort_key,
+ reverse=self.spec.reverse)
+ self.unbuffered_elements = [
+ self.unbuffered_elements[idx] for idx in argsort
+ ]
+ self.unbuffered_weights = [
+ self.unbuffered_weights[idx] for idx in argsort
+ ]
+ self.buffers.append(
+ _QuantileBuffer(
+ self.unbuffered_elements, self.unbuffered_weights,
weighted=True))
+ self.unbuffered_weights = []
+ elif self.unbuffered_elements:
+ self.unbuffered_elements.sort(
+ key=self.spec.key, reverse=self.spec.reverse)
+ self.buffers.append(
+ _QuantileBuffer(
+ self.unbuffered_elements, weights=[1], weighted=False))
+ self.unbuffered_elements = []
+
+ def collapse_if_needed(self, offset_fn):
+ # type: (Any) -> None
+
+ """
+ Checks if summary has too many buffers and collapses some of them until the
+ limit is restored.
+ """
+ while len(self.buffers) > self.spec.num_buffers:
+ to_collapse = [heapq.heappop(self.buffers), heapq.heappop(self.buffers)]
+ min_level = to_collapse[1].level
+
+ while self.buffers and self.buffers[0].level <= min_level:
+ to_collapse.append(heapq.heappop(self.buffers))
+
+ heapq.heappush(self.buffers, _collapse(to_collapse, offset_fn,
self.spec))
+
+
+def _collapse(buffers, offset_fn, spec):
+ # type: (List[_QuantileBuffer], Any, _QuantileSpec) -> _QuantileBuffer
+
+ """
+ Approximates elements from multiple buffers and produces a single buffer.
+ """
+ new_level = 0
+ new_weight = 0
+ for buffer in buffers:
+ # As presented in the paper, there should always be at least two
+ # buffers of the same (minimal) level to collapse, but it is possible
+ # to violate this condition when combining buffers from independently
+ # computed shards. If they differ we take the max.
+ new_level = max([new_level, buffer.level + 1])
+ new_weight = new_weight + sum(buffer.weights)
+ if spec.weighted:
+ step = new_weight / (spec.buffer_size - 1)
+ offset = new_weight / (2 * spec.buffer_size)
+ else:
+ step = new_weight
+ offset = offset_fn(new_weight)
+ new_elements, new_weights, min_val, max_val = \
+ _interpolate(buffers, spec.buffer_size, step, offset, spec)
+ if not spec.weighted:
+ new_weights = [new_weight]
+ return _QuantileBuffer(
+ new_elements, new_weights, spec.weighted, new_level, min_val, max_val)
+
+
+def _interpolate(buffers, count, step, offset, spec):
+ # type: (List[_QuantileBuffer], int, float, float, _QuantileSpec) ->
Tuple[List, List, Any, Any]
+
+ """
+ Emulates taking the ordered union of all elements in buffers, repeated
+ according to their weight, and picking out the (k * step + offset)-th
elements
+ of this list for `0 <= k < count`.
+ """
+ buffer_iterators = []
+ min_val = buffers[0].min_val
+ max_val = buffers[0].max_val
+ for buffer in buffers:
+ # Calculate extreme values for the union of buffers.
+ min_val = buffer.min_val if spec.less_than(
+ buffer.min_val, min_val) else min_val
+ max_val = buffer.max_val if spec.less_than(
+ max_val, buffer.max_val) else max_val
+ buffer_iterators.append(iter(buffer))
+
+ # Note that `heapq.merge` can also be used here since the buffers are sorted.
+ # In practice, however, `sorted` uses natural order in the union and
+ # significantly outperforms `heapq.merge`.
+ sorted_elements = sorted(
+ itertools.chain.from_iterable(buffer_iterators),
+ key=spec.weighted_key,
+ reverse=spec.reverse)
+
+ if not spec.weighted:
+ # If all buffers have the same weight, then quantiles' indices are evenly
+ # distributed over a range [0, len(sorted_elements)].
+ buffers_have_same_weight = True
+ weight = buffers[0].weights[0]
+ for buffer in buffers:
+ if buffer.weights[0] != weight:
+ buffers_have_same_weight = False
+ break
+ if buffers_have_same_weight:
+ offset = offset / weight
+ step = step / weight
+ max_idx = len(sorted_elements) - 1
+ result = [
+ sorted_elements[min(int(j * step + offset), max_idx)][0]
+ for j in range(count)
+ ]
+ return result, [], min_val, max_val
+
+ sorted_elements_iter = iter(sorted_elements)
+ weighted_element = next(sorted_elements_iter)
+ new_elements = []
+ new_weights = []
+ j = 0
+ current_weight = weighted_element[1]
+ previous_weight = 0
+ while j < count:
+ target_weight = j * step + offset
+ j += 1
+ try:
+ while current_weight <= target_weight:
+ weighted_element = next(sorted_elements_iter)
+ current_weight += weighted_element[1]
+ except StopIteration:
+ pass
+ new_elements.append(weighted_element[0])
+ if spec.weighted:
+ new_weights.append(current_weight - previous_weight)
+ previous_weight = current_weight
+
+ return new_elements, new_weights, min_val, max_val
+
-class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
+class ApproximateQuantilesCombineFn(CombineFn):
"""
This combiner gives an idea of the distribution of a collection of values
using approximate N-tiles. The output of this combiner is the list of size of
@@ -483,9 +783,12 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
&type=pdf
- The default error bound is (1 / N) for uniformly distributed data and
- min(1e-2, 1 / N) for weighted case, though in practice the accuracy tends to
- be much better.
+ Note that the weighted quantiles are evaluated using a generalized version of
+ the algorithm referenced in the paper.
+
+ The default error bound is (1 / num_quantiles) for uniformly distributed data
+ and min(1e-2, 1 / num_quantiles) for weighted case, though in practice the
+ accuracy tends to be much better.
Args:
num_quantiles: Number of quantiles to produce. It is the size of the final
@@ -501,6 +804,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
weighted: (optional) if set to True, the combiner produces weighted
quantiles. The input elements are then expected to be tuples of input
values with the corresponding weight.
+ input_batched: (optional) if set to True, inputs are expected to be batches
+ of elements.
"""
# For alternating between biasing up and down in the above even weight
@@ -514,7 +819,7 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
# non-optimal. The impact is logarithmic with respect to this value, so this
# default should be fine for most uses.
_MAX_NUM_ELEMENTS = 1e9
- _qs = None # type: _QuantileState[T]
+ _qs = None # type: _QuantileState
def __init__(
self,
@@ -523,29 +828,25 @@ class ApproximateQuantilesCombineFn(CombineFn,
Generic[T]):
num_buffers, # type: int
key=None,
reverse=False,
- weighted=False):
- def _comparator(a, b):
- if key:
- a, b = key(a), key(b)
-
- retval = int(a > b) - int(a < b)
-
- if reverse:
- return -retval
-
- return retval
-
- self._comparator = _comparator
-
+ weighted=False,
+ input_batched=False):
self._num_quantiles = num_quantiles
- self._buffer_size = buffer_size
- self._num_buffers = num_buffers
- if weighted:
- self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
- else:
- self._key = key
- self._reverse = reverse
- self._weighted = weighted
+ self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key,
reverse)
+ self._input_batched = input_batched
+ if self._input_batched:
+ setattr(self, 'add_input', self._add_inputs)
+
+ def __reduce__(self):
+ return (
+ self.__class__,
+ (
+ self._num_quantiles,
+ self._spec.buffer_size,
+ self._spec.num_buffers,
+ self._spec.key,
+ self._spec.reverse,
+ self._spec.weighted,
+ self._input_batched))
@classmethod
def create(
@@ -555,7 +856,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
max_num_elements=None,
key=None,
reverse=False,
- weighted=False):
+ weighted=False,
+ input_batched=False):
# type: (...) -> ApproximateQuantilesCombineFn
"""
@@ -582,11 +884,17 @@ class ApproximateQuantilesCombineFn(CombineFn,
Generic[T]):
weighted: (optional) if set to True, the combiner produces weighted
quantiles. The input elements are then expected to be tuples of values
with the corresponding weight.
+ input_batched: (optional) if set to True, inputs are expected to be
+ batches of elements.
"""
max_num_elements = max_num_elements or cls._MAX_NUM_ELEMENTS
if not epsilon:
epsilon = min(1e-2, 1.0 / num_quantiles) \
if weighted else (1.0 / num_quantiles)
+ # Note that calculation of the buffer size and the number of buffers here
+ # is based on technique used in the Munro-Paterson algorithm. Switching to
+ # the logic used in the "New Algorithm" may result in memory savings since
+ # it results in lower values for b and k in practice.
b = 2
while (b - 2) * (1 << (b - 2)) < epsilon * max_num_elements:
b = b + 1
@@ -598,30 +906,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
num_buffers=b,
key=key,
reverse=reverse,
- weighted=weighted)
-
- def _add_unbuffered(self, qs, elements):
- # type: (_QuantileState[T], Iterable[T]) -> None
-
- """
- Add a new buffer to the unbuffered list, creating a new buffer and
- collapsing if needed.
- """
- qs.unbuffered_elements.extend(elements)
- if len(qs.unbuffered_elements) >= qs.buffer_size:
- qs.unbuffered_elements.sort(key=self._key, reverse=self._reverse)
-
- while len(qs.unbuffered_elements) >= qs.buffer_size:
- to_buffer = qs.unbuffered_elements[:qs.buffer_size]
- heapq.heappush(
- qs.buffers,
- _QuantileBuffer(
- elements=to_buffer,
- weighted=self._weighted,
- weight=sum([element[1] for element in to_buffer])
- if self._weighted else 1))
- qs.unbuffered_elements = qs.unbuffered_elements[qs.buffer_size:]
- self._collapse_if_needed(qs)
+ weighted=weighted,
+ input_batched=input_batched)
def _offset(self, new_weight):
# type: (int) -> float
@@ -636,132 +922,33 @@ class ApproximateQuantilesCombineFn(CombineFn,
Generic[T]):
self._offset_jitter = 2 - self._offset_jitter
return (new_weight + self._offset_jitter) / 2
- def _collapse(self, buffers):
- # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
- new_level = 0
- new_weight = 0
- for buffer_elem in buffers:
- # As presented in the paper, there should always be at least two
- # buffers of the same (minimal) level to collapse, but it is possible
- # to violate this condition when combining buffers from independently
- # computed shards. If they differ we take the max.
- new_level = max([new_level, buffer_elem.level + 1])
- new_weight = new_weight + buffer_elem.weight
- if self._weighted:
- step = new_weight / (self._buffer_size - 1)
- offset = new_weight / (2 * self._buffer_size)
- else:
- step = new_weight
- offset = self._offset(new_weight)
- new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
- return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
- def _collapse_if_needed(self, qs):
- # type: (_QuantileState) -> None
- while len(qs.buffers) > self._num_buffers:
- to_collapse = []
- to_collapse.append(heapq.heappop(qs.buffers))
- to_collapse.append(heapq.heappop(qs.buffers))
- min_level = to_collapse[1].level
-
- while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
- to_collapse.append(heapq.heappop(qs.buffers))
-
- heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
- def _interpolate(self, i_buffers, count, step, offset):
- """
- Emulates taking the ordered union of all elements in buffers, repeated
- according to their weight, and picking out the (k * step + offset)-th
- elements of this list for `0 <= k < count`.
- """
-
- iterators = []
- new_elements = []
- compare_key = self._key
- if self._key and not self._weighted:
- compare_key = lambda x: self._key(x[0])
- for buffer_elem in i_buffers:
- iterators.append(buffer_elem.sized_iterator())
-
- # Python 3 `heapq.merge` support key comparison and returns an iterator and
- # does not pull the data into memory all at once. Python 2 does not
- # support comparison on its `heapq.merge` api, so we use the itertools
- # which takes the `key` function for comparison and creates an iterator
- # from it.
- if sys.version_info[0] < 3:
- sorted_elem = iter(
- sorted(
- itertools.chain.from_iterable(iterators),
- key=compare_key,
- reverse=self._reverse))
- else:
- sorted_elem = heapq.merge(
- *iterators, key=compare_key, reverse=self._reverse)
-
- weighted_element = next(sorted_elem)
- current = weighted_element[1]
- j = 0
- previous = 0
- while j < count:
- target = j * step + offset
- j = j + 1
- try:
- while current <= target:
- weighted_element = next(sorted_elem)
- current = current + weighted_element[1]
- except StopIteration:
- pass
- if self._weighted:
- new_elements.append((weighted_element[0], current - previous))
- previous = current
- else:
- new_elements.append(weighted_element[0])
- return new_elements
-
# TODO(BEAM-7746): Signature incompatible with supertype
def create_accumulator(self): # type: ignore[override]
- # type: () -> _QuantileState[T]
+ # type: () -> _QuantileState
self._qs = _QuantileState(
- buffer_size=self._buffer_size,
- num_buffers=self._num_buffers,
unbuffered_elements=[],
- buffers=[])
+ unbuffered_weights=[],
+ buffers=[],
+ spec=self._spec)
return self._qs
def add_input(self, quantile_state, element):
"""
Add a new element to the collection being summarized by quantile state.
"""
- value = element[0] if self._weighted else element
- if quantile_state.is_empty():
- quantile_state.min_val = quantile_state.max_val = value
- elif self._comparator(value, quantile_state.min_val) < 0:
- quantile_state.min_val = value
- elif self._comparator(value, quantile_state.max_val) > 0:
- quantile_state.max_val = value
- self._add_unbuffered(quantile_state, elements=[element])
+ quantile_state.add_unbuffered([element], self._offset)
return quantile_state
- def add_inputs(self, quantile_state, elements):
- """Add new elements to the collection being summarized by quantile state.
+ def _add_inputs(self, quantile_state, elements):
+ # type: (_QuantileState, List) -> _QuantileState
+
"""
- if not elements:
+ Add a batch of elements to the collection being summarized by quantile
+ state.
+ """
+ if len(elements) == 0:
return quantile_state
-
- values = [
- element[0] for element in elements
- ] if self._weighted else elements
- min_val = min(values)
- max_val = max(values)
- if quantile_state.is_empty():
- quantile_state.min_val = min_val
- quantile_state.max_val = max_val
- elif self._comparator(min_val, quantile_state.min_val) < 0:
- quantile_state.min_val = min_val
- elif self._comparator(max_val, quantile_state.max_val) > 0:
- quantile_state.max_val = max_val
- self._add_unbuffered(quantile_state, elements=elements)
+ quantile_state.add_unbuffered(elements, self._offset)
return quantile_state
def merge_accumulators(self, accumulators):
@@ -770,17 +957,16 @@ class ApproximateQuantilesCombineFn(CombineFn,
Generic[T]):
for accumulator in accumulators:
if accumulator.is_empty():
continue
- if not qs.min_val or self._comparator(accumulator.min_val,
- qs.min_val) < 0:
- qs.min_val = accumulator.min_val
- if not qs.max_val or self._comparator(accumulator.max_val,
- qs.max_val) > 0:
- qs.max_val = accumulator.max_val
-
- self._add_unbuffered(qs, accumulator.unbuffered_elements)
+ if self._spec.weighted:
+ qs.add_unbuffered(
+ [accumulator.unbuffered_elements, accumulator.unbuffered_weights],
+ self._offset)
+ else:
+ qs.add_unbuffered(accumulator.unbuffered_elements, self._offset)
qs.buffers.extend(accumulator.buffers)
- self._collapse_if_needed(qs)
+ heapq.heapify(qs.buffers)
+ qs.collapse_if_needed(self._offset)
return qs
def extract_output(self, accumulator):
@@ -791,46 +977,21 @@ class ApproximateQuantilesCombineFn(CombineFn,
Generic[T]):
"""
if accumulator.is_empty():
return []
-
+ accumulator.finalize()
all_elems = accumulator.buffers
- if self._weighted:
- unbuffered_weight = sum(
- [element[1] for element in accumulator.unbuffered_elements])
- total_weight = unbuffered_weight
+ total_weight = 0
+ if self._spec.weighted:
for buffer_elem in all_elems:
- total_weight += sum([element[1] for element in buffer_elem.elements])
- if accumulator.unbuffered_elements:
- accumulator.unbuffered_elements.sort(
- key=self._key, reverse=self._reverse)
- all_elems.append(
- _QuantileBuffer(
- accumulator.unbuffered_elements,
- weighted=True,
- weight=unbuffered_weight))
-
- step = 1.0 * total_weight / (self._num_quantiles - 1)
- offset = (1.0 * total_weight) / (self._num_quantiles - 1)
- mid_quantiles = [
- element[0] for element in self._interpolate(
- all_elems, self._num_quantiles - 2, step, offset)
- ]
+ total_weight += sum(buffer_elem.weights)
else:
- total_weight = len(accumulator.unbuffered_elements)
for buffer_elem in all_elems:
- total_weight += accumulator.buffer_size * buffer_elem.weight
-
- if accumulator.unbuffered_elements:
- accumulator.unbuffered_elements.sort(
- key=self._key, reverse=self._reverse)
- all_elems.append(
- _QuantileBuffer(accumulator.unbuffered_elements, weighted=False))
-
- step = 1.0 * total_weight / (self._num_quantiles - 1)
- offset = (1.0 * total_weight - 1) / (self._num_quantiles - 1)
- mid_quantiles = self._interpolate(
- all_elems, self._num_quantiles - 2, step, offset)
-
- quantiles = [accumulator.min_val]
- quantiles.extend(mid_quantiles)
- quantiles.append(accumulator.max_val)
- return quantiles
+ total_weight += len(buffer_elem.elements) * buffer_elem.weights[0]
+
+ step = total_weight / (self._num_quantiles - 1)
+ offset = (total_weight - 1) / (self._num_quantiles - 1)
+
+ quantiles, _, min_val, max_val = \
+ _interpolate(all_elems, self._num_quantiles - 2, step, offset,
+ self._spec)
+
+ return [min_val] + quantiles + [max_val]
diff --git a/sdks/python/apache_beam/transforms/stats_test.py
b/sdks/python/apache_beam/transforms/stats_test.py
index 860594f..1cd8c8f 100644
--- a/sdks/python/apache_beam/transforms/stats_test.py
+++ b/sdks/python/apache_beam/transforms/stats_test.py
@@ -482,13 +482,116 @@ class ApproximateQuantilesTest(unittest.TestCase):
equal_to([["ccccc", "aaa", "b"]]),
label='checkWithKeyAndReversed')
+ def test_batched_quantiles(self):
+ with TestPipeline() as p:
+ data = []
+ for i in range(100):
+ data.append([(j / 10, abs(j - 500))
+ for j in range(i * 10, (i + 1) * 10)])
+ pc = p | Create(data)
+ globally = (
+ pc | 'Globally' >> beam.ApproximateQuantiles.Globally(
+ 3, input_batched=True))
+ with_key = (
+ pc | 'Globally with key' >> beam.ApproximateQuantiles.Globally(
+ 3, key=sum, input_batched=True))
+ key_with_reversed = (
+ pc | 'Globally with key and reversed' >>
+ beam.ApproximateQuantiles.Globally(
+ 3, key=sum, reverse=True, input_batched=True))
+ assert_that(
+ globally,
+ equal_to([[(0.0, 500), (49.9, 1), (99.9, 499)]]),
+ label='checkGlobally')
+ assert_that(
+ with_key,
+ equal_to([[(50.0, 0), (72.5, 225), (99.9, 499)]]),
+ label='checkGloballyWithKey')
+ assert_that(
+ key_with_reversed,
+ equal_to([[(99.9, 499), (72.5, 225), (50.0, 0)]]),
+ label='checkGloballyWithKeyAndReversed')
+
+ def test_batched_weighted_quantiles(self):
+ with TestPipeline() as p:
+ data = []
+ for i in range(100):
+ data.append([[(i / 10, abs(i - 500))
+ for i in range(i * 10, (i + 1) * 10)], [i] * 10])
+ pc = p | Create(data)
+ globally = (
+ pc | 'Globally' >> beam.ApproximateQuantiles.Globally(
+ 3, weighted=True, input_batched=True))
+ with_key = (
+ pc | 'Globally with key' >> beam.ApproximateQuantiles.Globally(
+ 3, key=sum, weighted=True, input_batched=True))
+ key_with_reversed = (
+ pc | 'Globally with key and reversed' >>
+ beam.ApproximateQuantiles.Globally(
+ 3, key=sum, reverse=True, weighted=True, input_batched=True))
+ assert_that(
+ globally,
+ equal_to([[(0.0, 500), (70.8, 208), (99.9, 499)]]),
+ label='checkGlobally')
+ assert_that(
+ with_key,
+ equal_to([[(50.0, 0), (21.0, 290), (99.9, 499)]]),
+ label='checkGloballyWithKey')
+ assert_that(
+ key_with_reversed,
+ equal_to([[(99.9, 499), (21.0, 290), (50.0, 0)]]),
+ label='checkGloballyWithKeyAndReversed')
+
+ def test_quantiles_merge_accumulators(self):
+ # This test exercises merging multiple buffers and approximation accuracy.
+ # The max_num_elements is set to a small value to trigger buffers collapse
+ # and interpolation. Under the conditions below, buffer_size=125 and
+ # num_buffers=4, so we're only allowed to keep half of the input values.
+ num_accumulators = 100
+ num_quantiles = 5
+ eps = 0.01
+ max_num_elements = 1000
+ combine_fn = ApproximateQuantilesCombineFn.create(
+ num_quantiles, eps, max_num_elements)
+ combine_fn_weighted = ApproximateQuantilesCombineFn.create(
+ num_quantiles, eps, max_num_elements, weighted=True)
+ data = list(range(1000))
+ weights = list(reversed(range(1000)))
+ step = math.ceil(len(data) / num_accumulators)
+ accumulators = []
+ accumulators_weighted = []
+ for i in range(num_accumulators):
+ accumulator = combine_fn.create_accumulator()
+ accumulator_weighted = combine_fn_weighted.create_accumulator()
+ for element, weight in zip(data[i*step:(i+1)*step],
+ weights[i*step:(i+1)*step]):
+ accumulator = combine_fn.add_input(accumulator, element)
+ accumulator_weighted = combine_fn_weighted.add_input(
+ accumulator_weighted, (element, weight))
+ accumulators.append(accumulator)
+ accumulators_weighted.append(accumulator_weighted)
+ accumulator = combine_fn.merge_accumulators(accumulators)
+ accumulator_weighted = combine_fn_weighted.merge_accumulators(
+ accumulators_weighted)
+ quantiles = combine_fn.extract_output(accumulator)
+ quantiles_weighted = combine_fn_weighted.extract_output(
+ accumulator_weighted)
+
+ # In fact, the final accuracy is much higher than eps, but we test for a
+ # minimal accuracy here.
+ for q, actual_q in zip(quantiles, [0, 249, 499, 749, 999]):
+ self.assertAlmostEqual(q, actual_q, delta=max_num_elements * eps)
+ for q, actual_q in zip(quantiles_weighted, [0, 133, 292, 499, 999]):
+ self.assertAlmostEqual(q, actual_q, delta=max_num_elements * eps)
+
@staticmethod
def _display_data_matcher(instance):
expected_items = [
DisplayDataItemMatcher('num_quantiles', instance._num_quantiles),
DisplayDataItemMatcher('weighted', str(instance._weighted)),
DisplayDataItemMatcher('key', str(instance._key.__name__)),
- DisplayDataItemMatcher('reverse', str(instance._reverse))
+ DisplayDataItemMatcher('reverse', str(instance._reverse)),
+ DisplayDataItemMatcher('input_batched', str(instance._input_batched)),
]
return expected_items
@@ -551,8 +654,9 @@ class ApproximateQuantilesBufferTest(unittest.TestCase):
combine_fn = ApproximateQuantilesCombineFn.create(
num_quantiles=10, max_num_elements=maxInputSize, epsilon=epsilon)
self.assertEqual(
- expectedNumBuffers, combine_fn._num_buffers, "Number of buffers")
- self.assertEqual(expectedBufferSize, combine_fn._buffer_size, "Buffer
size")
+ expectedNumBuffers, combine_fn._spec.num_buffers, "Number of buffers")
+ self.assertEqual(
+ expectedBufferSize, combine_fn._spec.buffer_size, "Buffer size")
@parameterized.expand(_build_quantilebuffer_test_data)
def test_correctness(self, epsilon, maxInputSize, *args):
@@ -561,8 +665,8 @@ class ApproximateQuantilesBufferTest(unittest.TestCase):
"""
combine_fn = ApproximateQuantilesCombineFn.create(
num_quantiles=10, max_num_elements=maxInputSize, epsilon=epsilon)
- b = combine_fn._num_buffers
- k = combine_fn._buffer_size
+ b = combine_fn._spec.num_buffers
+ k = combine_fn._spec.buffer_size
n = maxInputSize
self.assertLessEqual((b - 2) * (1 << (b - 2)) + 0.5, (epsilon * n),
'(b-2)2^(b-2) + 1/2 <= eN')
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index fdc9c5b..624beb3 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -277,6 +277,7 @@ setuptools.setup(
'apache_beam/runners/worker/opcounters.py',
'apache_beam/runners/worker/operations.py',
'apache_beam/transforms/cy_combiners.py',
+ 'apache_beam/transforms/stats.py',
'apache_beam/utils/counters.py',
'apache_beam/utils/windowed_value.py',
]),