iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570715768
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -501,6 +781,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
weighted: (optional) if set to True, the combiner produces weighted
quantiles. The input elements are then expected to be tuples of input
values with the corresponding weight.
+ batch_input: (optional) if set to True, inputs are expected to be batches
of
Review comment:
Done.
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
weighted: (optional) if set to True, the transform returns weighted
quantiles. The input PCollection is then expected to contain tuples of
input values with the corresponding weight.
+ batch_input: (optional) if set to True, the transform expects each
element
+ of input PCollection to be a batch. Provides a way to accumulate
Review comment:
1. Done, also added examples.
2. I think tuple (element, weight) generalizes the same way to (elements,
weights) as it does to [(element1, weight1), ...], so I don't see any strong
advantage of either from usability perspective (for instance, TFT's quantiles
take them as separate tensors), but there's a benefit in taking (elements,
weights) from code simplicity perspective - it allows weighted and unweighted
cases to have a lot of code in common.
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
weighted: (optional) if set to True, the transform returns weighted
quantiles. The input PCollection is then expected to contain tuples of
input values with the corresponding weight.
+ batch_input: (optional) if set to True, the transform expects each
element
+ of input PCollection to be a batch. Provides a way to accumulate
+ multiple elements at a time more efficiently.
"""
- def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+ def __init__(
+ self,
+ num_quantiles,
+ key=None,
+ reverse=False,
+ weighted=False,
+ batch_input=False):
self._num_quantiles = num_quantiles
self._key = key
self._reverse = reverse
self._weighted = weighted
+ self._batch_input = batch_input
def expand(self, pcoll):
return pcoll | CombinePerKey(
ApproximateQuantilesCombineFn.create(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted))
+ weighted=self._weighted,
+ batch_input=self._batch_input))
def display_data(self):
return ApproximateQuantiles._display_data(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted)
+ weighted=self._weighted,
+ batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+ """Quantiles computation specifications."""
+ def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+ # type: (int, int, bool, Any, bool) -> None
+ self.buffer_size = buffer_size
+ self.num_buffers = num_buffers
+ self.weighted = weighted
+ self.key = key
+ self.reverse = reverse
+
+ # Used to sort tuples of values and weights.
+ self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+ # Used to compare values.
+ if reverse and key is None:
+ self.less_than = lambda a, b: a > b
+ elif reverse:
+ self.less_than = lambda a, b: key(a) > key(b)
+ elif key is None:
+ self.less_than = lambda a, b: a < b
+ else:
+ self.less_than = lambda a, b: key(a) < key(b)
+
+ def get_argsort_key(self, elements):
+ # type: (List) -> Any
+
+ """Returns a key for sorting indices of elements by element's value."""
+ if self.key is None:
+ return elements.__getitem__
+ else:
+ return lambda idx: self.key(elements[idx])
+
+ def __reduce__(self):
+ return (
+ self.__class__,
+ (
+ self.buffer_size,
+ self.num_buffers,
+ self.weighted,
+ self.key,
+ self.reverse))
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
"""A single buffer in the sense of the referenced algorithm.
(see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
&type=pdf and ApproximateQuantilesCombineFn for further information)"""
- def __init__(self, elements, weighted, level=0, weight=1):
- # type: (Sequence[T], bool, int, int) -> None
- # In case of weighted quantiles, elements are tuples of values and weights.
+ def __init__(
+ self, elements, weights, weighted, level=0, min_val=None, max_val=None):
Review comment:
Done.
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -523,29 +805,25 @@ def __init__(
num_buffers, # type: int
key=None,
reverse=False,
- weighted=False):
- def _comparator(a, b):
- if key:
- a, b = key(a), key(b)
-
- retval = int(a > b) - int(a < b)
-
- if reverse:
- return -retval
-
- return retval
-
- self._comparator = _comparator
-
+ weighted=False,
+ batch_input=False):
self._num_quantiles = num_quantiles
- self._buffer_size = buffer_size
- self._num_buffers = num_buffers
- if weighted:
- self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
- else:
- self._key = key
- self._reverse = reverse
- self._weighted = weighted
+ self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key,
reverse)
+ self._batch_input = batch_input
+ if self._batch_input:
+ setattr(self, 'add_input', self._add_inputs)
Review comment:
Done.
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
weighted: (optional) if set to True, the combiner produces weighted
quantiles. The input elements are then expected to be tuples of values
with the corresponding weight.
+ batch_input: (optional) if set to True, inputs are expected to be batches
+ of elements.
Review comment:
Yes, here the logic of the Munro-Paterson algorithm is used. Switching
to the calculation from 4.5 would allow to reduce size of the (full)
accumulator. But it's probably out of the scope of this PR, should I leave a
TODO?
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
weighted: (optional) if set to True, the transform returns weighted
quantiles. The input PCollection is then expected to contain tuples of
input values with the corresponding weight.
+ batch_input: (optional) if set to True, the transform expects each
element
+ of input PCollection to be a batch. Provides a way to accumulate
+ multiple elements at a time more efficiently.
"""
- def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+ def __init__(
+ self,
+ num_quantiles,
+ key=None,
+ reverse=False,
+ weighted=False,
+ batch_input=False):
self._num_quantiles = num_quantiles
self._key = key
self._reverse = reverse
self._weighted = weighted
+ self._batch_input = batch_input
def expand(self, pcoll):
return pcoll | CombinePerKey(
ApproximateQuantilesCombineFn.create(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted))
+ weighted=self._weighted,
+ batch_input=self._batch_input))
def display_data(self):
return ApproximateQuantiles._display_data(
num_quantiles=self._num_quantiles,
key=self._key,
reverse=self._reverse,
- weighted=self._weighted)
+ weighted=self._weighted,
+ batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+ """Quantiles computation specifications."""
+ def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+ # type: (int, int, bool, Any, bool) -> None
+ self.buffer_size = buffer_size
+ self.num_buffers = num_buffers
+ self.weighted = weighted
+ self.key = key
+ self.reverse = reverse
+
+ # Used to sort tuples of values and weights.
+ self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+ # Used to compare values.
+ if reverse and key is None:
+ self.less_than = lambda a, b: a > b
+ elif reverse:
+ self.less_than = lambda a, b: key(a) > key(b)
+ elif key is None:
+ self.less_than = lambda a, b: a < b
+ else:
+ self.less_than = lambda a, b: key(a) < key(b)
+
+ def get_argsort_key(self, elements):
+ # type: (List) -> Any
Review comment:
Done.
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
self._offset_jitter = 2 - self._offset_jitter
return (new_weight + self._offset_jitter) / 2
- def _collapse(self, buffers):
- # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
- new_level = 0
- new_weight = 0
- for buffer_elem in buffers:
- # As presented in the paper, there should always be at least two
- # buffers of the same (minimal) level to collapse, but it is possible
- # to violate this condition when combining buffers from independently
- # computed shards. If they differ we take the max.
- new_level = max([new_level, buffer_elem.level + 1])
- new_weight = new_weight + buffer_elem.weight
- if self._weighted:
- step = new_weight / (self._buffer_size - 1)
- offset = new_weight / (2 * self._buffer_size)
- else:
- step = new_weight
- offset = self._offset(new_weight)
- new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
- return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
- def _collapse_if_needed(self, qs):
- # type: (_QuantileState) -> None
- while len(qs.buffers) > self._num_buffers:
- to_collapse = []
- to_collapse.append(heapq.heappop(qs.buffers))
- to_collapse.append(heapq.heappop(qs.buffers))
- min_level = to_collapse[1].level
-
- while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
- to_collapse.append(heapq.heappop(qs.buffers))
-
- heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
- def _interpolate(self, i_buffers, count, step, offset):
- """
- Emulates taking the ordered union of all elements in buffers, repeated
- according to their weight, and picking out the (k * step + offset)-th
- elements of this list for `0 <= k < count`.
- """
-
- iterators = []
- new_elements = []
- compare_key = self._key
- if self._key and not self._weighted:
- compare_key = lambda x: self._key(x[0])
- for buffer_elem in i_buffers:
- iterators.append(buffer_elem.sized_iterator())
-
- # Python 3 `heapq.merge` support key comparison and returns an iterator and
- # does not pull the data into memory all at once. Python 2 does not
- # support comparison on its `heapq.merge` api, so we use the itertools
- # which takes the `key` function for comparison and creates an iterator
- # from it.
- if sys.version_info[0] < 3:
- sorted_elem = iter(
- sorted(
- itertools.chain.from_iterable(iterators),
- key=compare_key,
- reverse=self._reverse))
- else:
- sorted_elem = heapq.merge(
- *iterators, key=compare_key, reverse=self._reverse)
-
- weighted_element = next(sorted_elem)
- current = weighted_element[1]
- j = 0
- previous = 0
- while j < count:
- target = j * step + offset
- j = j + 1
- try:
- while current <= target:
- weighted_element = next(sorted_elem)
- current = current + weighted_element[1]
- except StopIteration:
- pass
- if self._weighted:
- new_elements.append((weighted_element[0], current - previous))
- previous = current
- else:
- new_elements.append(weighted_element[0])
- return new_elements
-
# TODO(BEAM-7746): Signature incompatible with supertype
def create_accumulator(self): # type: ignore[override]
- # type: () -> _QuantileState[T]
+ # type: () -> _QuantileState
self._qs = _QuantileState(
- buffer_size=self._buffer_size,
- num_buffers=self._num_buffers,
unbuffered_elements=[],
- buffers=[])
+ unbuffered_weights=[],
+ buffers=[],
+ spec=self._spec)
return self._qs
def add_input(self, quantile_state, element):
"""
Add a new element to the collection being summarized by quantile state.
"""
- value = element[0] if self._weighted else element
- if quantile_state.is_empty():
- quantile_state.min_val = quantile_state.max_val = value
- elif self._comparator(value, quantile_state.min_val) < 0:
- quantile_state.min_val = value
- elif self._comparator(value, quantile_state.max_val) > 0:
- quantile_state.max_val = value
- self._add_unbuffered(quantile_state, elements=[element])
+ quantile_state.add_unbuffered([element], self._offset)
Review comment:
I don't think that this may cause any problems with cythonization or
performance. They will be static methods though, so the only difference is the
namespace and neither of them deals with _QuantileState objects. But I don't
have any strong preference, WDYT?
##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
equal_to([["ccccc", "aaa", "b"]]),
label='checkWithKeyAndReversed')
+ def test_batched_quantiles(self):
Review comment:
1. I think the tests use DirectRunner, so probably no.
2. The approximation will be properly tested only if either the number of
inputs will be large with default settings, or max_num_elements and epsilon
will be set to extremely low and large values, respectively. I tested
approximation with large number of inputs and FlumeCppRunner during
development, but it took some time to complete, so it's probably not suitable
for continuous testing. It might make sense for me to initialize the CombineFn
with the extreme values and test add_input, merge_accumulators and
extract_output directly, WDYT?
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -452,15 +511,236 @@ def __init__(self, buffer_size, num_buffers,
unbuffered_elements, buffers):
# into new, full buffers and then take them into account when computing the
# final output.
self.unbuffered_elements = unbuffered_elements
+ self.unbuffered_weights = unbuffered_weights
+
+ def __reduce__(self):
Review comment:
When Cythonization is enabled pickling fails without it. I can lookup
the error description, if interested. Added a comment.
##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
K = typing.TypeVar('K')
V = typing.TypeVar('V')
+try:
+ import mmh3 # pylint: disable=import-error
-def _get_default_hash_fn():
- """Returns either murmurhash or md5 based on installation."""
- try:
- import mmh3 # pylint: disable=import-error
+ def _mmh3_hash(value):
+ # mmh3.hash64 returns two 64-bit unsigned integers
+ return mmh3.hash64(value, seed=0, signed=False)[0]
+
+ _default_hash_fn = _mmh3_hash
+ _default_hash_fn_type = 'mmh3'
+except ImportError:
- def _mmh3_hash(value):
- # mmh3.hash64 returns two 64-bit unsigned integers
- return mmh3.hash64(value, seed=0, signed=False)[0]
+ def _md5_hash(value):
+ # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+ # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+ # integer fingerprint.
+ return int(hashlib.md5(value).hexdigest()[:16], 16)
- return _mmh3_hash
+ _default_hash_fn = _md5_hash
+ _default_hash_fn_type = 'md5'
- except ImportError:
+
+def _get_default_hash_fn():
+ """Returns either murmurhash or md5 based on installation."""
+ if _default_hash_fn_type == 'md5':
logging.warning(
'Couldn\'t find murmurhash. Install mmh3 for a faster implementation
of'
Review comment:
Should I make it a dependency then?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]