[ 
https://issues.apache.org/jira/browse/BEAM-6694?focusedWorklogId=295567&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295567
 ]

ASF GitHub Bot logged work on BEAM-6694:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Aug/19 17:00
            Start Date: 15/Aug/19 17:00
    Worklog Time Spent: 10m 
      Work Description: aaltay commented on pull request #9153: [BEAM-6694] 
Added Approximate Quantile Transfrom on Python SDK
URL: https://github.com/apache/beam/pull/9153#discussion_r314401166
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/stats.py
 ##########
 @@ -234,3 +237,401 @@ def extract_output(accumulator):
 
   def display_data(self):
     return {'sample_size': self._sample_size}
+
+
+class ApproximateQuantiles(object):
+  """
+  PTransfrom for getting the idea of data distribution using approximate N-tile
+  (e.g. quartiles, percentiles etc.) either globally or per-key.
+  """
+
+  @staticmethod
+  def _display_data(num_quantiles, key, reverse):
+    return {
+        'num_quantiles': DisplayDataItem(num_quantiles, label="Quantile 
Count"),
+        'key': DisplayDataItem(key.__name__ if hasattr(key, '__name__')
+                               else key.__class__.__name__,
+                               label='Record Comparer Key'),
+        'reverse': DisplayDataItem(str(reverse), label='Is reversed')
+    }
+
+  @typehints.with_input_types(T)
+  @typehints.with_output_types(typing.List[T])
+  class Globally(PTransform):
+    """
+    PTransform takes PCollection and returns a list whose single value is
+    approximate N-tiles of the input collection globally.
+
+    Args:
+      num_quantiles: number of elements in the resulting quantiles values list.
+      key: (optional) Key is  a mapping of elements to a comparable key, 
similar
+        to the key argument of Python's sorting methods.
+      reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+    """
+
+    def __init__(self, num_quantiles, key=None, reverse=False):
+      self._num_quantiles = num_quantiles
+      self._key = key
+      self._reverse = reverse
+
+    def expand(self, pcoll):
+      return pcoll | CombineGlobally(ApproximateQuantilesCombineFn.create(
+          num_quantiles=self._num_quantiles, key=self._key,
+          reverse=self._reverse))
+
+    def display_data(self):
+      return ApproximateQuantiles._display_data(
+          num_quantiles=self._num_quantiles, key=self._key,
+          reverse=self._reverse)
+
+  @typehints.with_input_types(typing.Tuple[K, V])
+  @typehints.with_output_types(typing.Tuple[K, typing.List[V]])
+  class PerKey(PTransform):
+    """
+    PTransform takes PCollection of KV and returns a list based on each key
+    whose single value is list of approximate N-tiles of the input element of
+    the key.
+
+    Args:
+      num_quantiles: number of elements in the resulting quantiles values list.
+      key: (optional) Key is  a mapping of elements to a comparable key, 
similar
+        to the key argument of Python's sorting methods.
+      reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+    """
+
+    def __init__(self, num_quantiles, key=None, reverse=False):
+      self._num_quantiles = num_quantiles
+      self._key = key
+      self._reverse = reverse
+
+    def expand(self, pcoll):
+      return pcoll | CombinePerKey(ApproximateQuantilesCombineFn.create(
+          num_quantiles=self._num_quantiles, key=self._key,
+          reverse=self._reverse))
+
+    def display_data(self):
+      return ApproximateQuantiles._display_data(
+          num_quantiles=self._num_quantiles, key=self._key,
+          reverse=self._reverse)
+
+
+class _QuantileBuffer(object):
+  """A single buffer in the sense of the referenced algorithm.
+  (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
+  &type=pdf and ApproximateQuantilesCombineFn for further information)"""
+
+  def __init__(self, elements, level=0, weight=1):
+    self.elements = elements
+    self.level = level
+    self.weight = weight
+
+  def __lt__(self, other):
+    self.elements < other.elements
+
+  def sized_iterator(self):
+
+    class QuantileBufferIterator(object):
+      def __init__(self, elem, weight):
+        self._iter = iter(elem)
+        self.weight = weight
+
+      def __iter__(self):
+        return self
+
+      def __next__(self):
+        value = next(self._iter)
+        return (value, self.weight)
+
+      next = __next__  # For Python 2
+
+    return QuantileBufferIterator(self.elements, self.weight)
+
+
+class _QuantileState(object):
+  """
+  Compact summarization of a collection on which quantiles can be estimated.
+  """
+  min_val = None  # Holds smallest item in the list
+  max_val = None  # Holds largest item in the list
+
+  def __init__(self, buffer_size, num_buffers, unbuffered_elements, buffers):
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.buffers = buffers
+
+    # The algorithm requires that the manipulated buffers always be filled to
+    # capacity to perform the collapse operation. This operation can be 
extended
+    # to buffers of varying sizes by introducing the notion of fractional
+    # weights, but it's easier to simply combine the remainders from all shards
+    # into new, full buffers and then take them into account when computing the
+    # final output.
+    self.unbuffered_elements = unbuffered_elements
+
+  def is_empty(self):
+    """Check if the buffered & unbuffered elements are empty or not."""
+    return not self.unbuffered_elements and not self.buffers
+
+
+class ApproximateQuantilesCombineFn(CombineFn):
+  """
+  This combiner gives an idea of the distribution of a collection of values
+  using approximate N-tiles. The output of this combiner is the list of size of
+  the number of quantiles (num_quantiles), containing the input values of the
+  minimum value item of the list, the intermediate values (n-tiles) and the
+  maximum value item of the list, in the sort order provided via key (similar
+  to the key argument of Python's sorting methods).
+
+  If there are fewer values to combine than the number of quantile
+  (num_quantiles), then the resulting list will contain all the values being
+  combined, in sorted order.
+
+  If no `key` is provided, then the results are sorted in the natural order.
+
+  To evaluate the quantiles, we use the "New Algorithm" described here:
+
+  [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other
+  Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM SIGMOD,
+  Vol 27, No 2, p 426-435, June 1998.
+  http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
+  &type=pdf
+
+  The default error bound is (1 / N), though in practice the accuracy
+  tends to be much better.
+
+  Args:
+    num_quantiles: Number of quantiles to produce. It is the size of the final
+      output list, including the mininum and maximum value items.
+    buffer_size: The size of the buffers, corresponding to k in the referenced
+      paper.
+    num_buffers: The number of buffers, corresponding to b in the referenced
+      paper.
+    key: (optional) Key is a mapping of elements to a comparable key, similar
+      to the key argument of Python's sorting methods.
+    reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+  """
+
+  # For alternating between biasing up and down in the above even weight
+  # collapse operation.
+  _offset_jitter = 0
+
+  # The cost (in time and space) to compute quantiles to a given accuracy is a
+  # function of the total number of elements in the data set. If an estimate is
+  # not known or specified, we use this as an upper bound. If this is too low,
+  # errors may exceed the requested tolerance; if too high, efficiency may be
+  # non-optimal. The impact is logarithmic with respect to this value, so this
+  # default should be fine for most uses.
+  _MAX_NUM_ELEMENTS = 1e9
+  _qs = None  # Refers to the _QuantileState
+
+  def __init__(self, num_quantiles, buffer_size, num_buffers, key=None,
+               reverse=False):
+    if key:
+      self._comparator = lambda a, b: (key(a) < key(b)) - (key(a) > key(b)) \
+        if reverse else (key(a) > key(b)) - (key(a) < key(b))
+    else:
+      self._comparator = lambda a, b: (a < b) - (a > b) if reverse \
+        else (a > b) - (a < b)
+
+    self._num_quantiles = num_quantiles
+    self._buffer_size = buffer_size
+    self._num_buffers = num_buffers
+    self._key = key
+    self._reverse = reverse
+
+  @classmethod
+  def create(cls, num_quantiles, epsilon=None, max_num_elements=None, key=None,
+             reverse=False):
+    """
+    Creates an approximate quantiles combiner with the given key and desired
+    number of quantiles.
+
+    Args:
+      num_quantiles: Number of quantiles to produce. It is the size of the
+      final output list, including the mininum and maximum value items.
+      epsilon: (optional) The default error bound is `epsilon`, which holds as
+        long as the number of elements is less than `_MAX_NUM_ELEMENTS`.
+        Specifically, if one considers the input as a sorted list x_1, ...,
+        x_N, then the distance between each exact quantile x_c and its
+        approximation x_c' is bounded by `|c - c'| < epsilon * N`. Note that
+        these errors are worst-case scenarios. In practice the accuracy tends
+        to be much better.
+      max_num_elements: (optional) The cost (in time and space) to compute
+        quantiles to a given accuracy is a function of the total number of
+        elements in the data set.
+      key: (optional) Key is a mapping of elements to a comparable key, similar
+        to the key argument of Python's sorting methods.
+      reverse: (optional) whether to order things smallest to largest, rather
+          than largest to smallest
+    """
+    max_num_elements = max_num_elements or cls._MAX_NUM_ELEMENTS
+    if not epsilon:
+      epsilon = 1.0 / num_quantiles
+    b = 2
+    while (b - 2) * (1 << (b - 2)) < epsilon * max_num_elements:
+      b = b + 1
+    b = b - 1
+    k = max(2, math.ceil(max_num_elements / float(1 << (b - 1))))
+    return cls(num_quantiles=num_quantiles, buffer_size=k, num_buffers=b,
+               key=key, reverse=reverse)
+
+  def _add_unbuffered(self, qs, elem):
+    """
+    Add a new buffer to the unbuffered list, creating a new buffer and
+    collapsing if needed.
+    """
+    qs.unbuffered_elements.append(elem)
+    if len(qs.unbuffered_elements) == qs.buffer_size:
+      qs.unbuffered_elements.sort(key=self._key, reverse=self._reverse)
+      heapq.heappush(qs.buffers,
+                     _QuantileBuffer(elements=qs.unbuffered_elements))
+      qs.unbuffered_elements = []
+      self._collapse_if_needed(qs)
+    return qs
+
+  def _offset(self, newWeight):
+    """
+    If the weight is even, we must round up or down. Alternate between these
+    two options to avoid a bias.
+    """
+    if newWeight % 2 == 1:
+      return (newWeight + 1) / 2
+    else:
+      self._offset_jitter = 2 - self._offset_jitter
+      return (newWeight + self._offset_jitter) / 2
+
+  def _collapse(self, buffers):
+    new_level = 0
+    new_weight = 0
+    for buffer_elem in buffers:
+      # As presented in the paper, there should always be at least two
+      # buffers of the same (minimal) level to collapse, but it is possible
+      # to violate this condition when combining buffers from independently
+      # computed shards.  If they differ we take the max.
+      new_level = max([new_level, buffer_elem.level + 1])
+      new_weight = new_weight + buffer_elem.weight
+    new_elements = self._interpolate(buffers, self._buffer_size, new_weight,
+                                     self._offset(new_weight))
+    return _QuantileBuffer(new_elements, new_level, new_weight)
+
+  def _collapse_if_needed(self, qs):
+    while len(qs.buffers) > self._num_buffers:
+      toCollapse = []
+      toCollapse.append(heapq.heappop(qs.buffers))
+      toCollapse.append(heapq.heappop(qs.buffers))
+      minLevel = toCollapse[1].level
+
+      while len(qs.buffers) > 0 and qs.buffers[0].level == minLevel:
+        toCollapse.append(heapq.heappop(qs.buffers))
+
+      heapq.heappush(qs.buffers, self._collapse(toCollapse))
+
+  def _interpolate(self, i_buffers, count, step, offset):
+    """
+    Emulates taking the ordered union of all elements in buffers, repeated
+    according to their weight, and picking out the (k * step + offset)-th
+    elements of this list for `0 <= k < count`.
+    """
+
+    iterators = []
+    new_elements = []
+    compare_key = None
+    if self._key:
+      compare_key = lambda x: self._key(x[0])
+    for buffer_elem in i_buffers:
+      iterators.append(buffer_elem.sized_iterator())
+
+    # Python 3 `heapq.merge` support key comparison and returns an iterator and
+    # does not pull the data into memory all at once. Python 2 does not
+    # support comparison on its `heapq.merge` api, so we use the itertools
+    # which takes the `key` function for comparison and creates an iterator
+    # from it.
+    if sys.version_info[0] < 3:
+      sorted_elem = iter(
+          sorted(itertools.chain.from_iterable(iterators), key=compare_key,
+                 reverse=self._reverse))
+    else:
+      sorted_elem = heapq.merge(*iterators, key=compare_key,
+                                reverse=self._reverse)
+
+    weighted_element = next(sorted_elem)
+    current = weighted_element[1]
+    j = 0
+    while j < count:
+      target = j * step + offset
+      j = j + 1
+      try:
+        while current <= target:
+          weighted_element = next(sorted_elem)
+          current = current + weighted_element[1]
+      except StopIteration:
+        pass
+      new_elements.append(weighted_element[0])
+    return new_elements
+
+  def create_accumulator(self):
+    self._qs = _QuantileState(buffer_size=self._buffer_size,
+                              num_buffers=self._num_buffers,
+                              unbuffered_elements=[], buffers=[])
+    return self._qs
+
+  def add_input(self, quantile_state, element):
+    """
+    Add a new element to the collection being summarized by quantile state.
+    """
+    if quantile_state.is_empty():
+      quantile_state.min_val = quantile_state.max_val = element
+    elif self._comparator(element, quantile_state.min_val) < 0:
+      quantile_state.min_val = element
+    elif self._comparator(element, quantile_state.max_val) > 0:
+      quantile_state.max_val = element
+    qs = self._add_unbuffered(quantile_state, elem=element)
 
 Review comment:
   `_add_unbuffered` modifies `quantile_state` in place, right? If `qs` is 
exactly the same thing as `quantile_state` at this point, it would be cleaner 
to change `_add_unbuffered` to return nothing, change the code here to
   
   ```
   self._add_unbuffered(quantile_state, elem=element)
   return quantile_state
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 295567)
    Time Spent: 8h 50m  (was: 8h 40m)

> ApproximateQuantiles transform for Python SDK
> ---------------------------------------------
>
>                 Key: BEAM-6694
>                 URL: https://issues.apache.org/jira/browse/BEAM-6694
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Shehzaad Nakhoda
>            Priority: Minor
>          Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> Add PTransforms for getting an idea of a PCollection's data distribution 
> using approximate N-tiles (e.g. quartiles, percentiles, etc.), either 
> globally or per-key.
> It should offer the same API as its Java counterpart: 
> https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to