[ 
https://issues.apache.org/jira/browse/BEAM-6694?focusedWorklogId=295324&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295324
 ]

ASF GitHub Bot logged work on BEAM-6694:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Aug/19 08:49
            Start Date: 15/Aug/19 08:49
    Worklog Time Spent: 10m 
      Work Description: mszb commented on pull request #9153: [BEAM-6694] Added 
Approximate Quantile Transfrom on Python SDK
URL: https://github.com/apache/beam/pull/9153#discussion_r314223878
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/stats.py
 ##########
 @@ -234,3 +239,419 @@ def extract_output(accumulator):
 
   def display_data(self):
     return {'sample_size': self._sample_size}
+
+
+class ApproximateQuantiles(object):
+  """
+  PTransfrom for getting the idea of data distribution using approximate N-tile
+  (e.g. quartiles, percentiles etc.) either globally or per-key.
+  """
+
+  @staticmethod
+  def _display_data(num_quantiles, compare, key, reverse):
+    return {
+        'num_quantiles': DisplayDataItem(num_quantiles, label="Quantile 
Count"),
+        'compare': DisplayDataItem(compare.__class__,
+                                   label='Record Comparer FN'),
+        'key': DisplayDataItem(key.__class__, label='Record Comparer Key'),
+        'reverse': DisplayDataItem(reverse.__class__, label='Is reversed')
+    }
+
+  @typehints.with_input_types(T)
+  @typehints.with_output_types(typing.List[T])
+  class Globally(PTransform):
+    """
+    PTransform takes PCollection and returns a list whose single value is
+    approximate N-tiles of the input collection globally.
+
+    Args:
+      num_quantiles: number of elements in the resulting quantiles values list.
+      compare: (optional) Comparator function which is an implementation
+        of "a < b" taking at least two arguments (a and b). Which is later
+        converted to key function as Python 3 does not support cmp.
+      key: (optional) Key is  a mapping of elements to a comparable key, 
similar
+        to the key argument of Python's sorting methods.
+      reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+    """
+
+    def __init__(self, num_quantiles, compare=None, key=None, reverse=False):
+      self._num_quantiles = num_quantiles
+      self._compare = compare
+      self._key = key
+      self._reverse = reverse
+
+    def expand(self, pcoll):
+      return pcoll | CombineGlobally(ApproximateQuantilesCombineFn.create(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse))
+
+    def display_data(self):
+      return ApproximateQuantiles._display_data(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse)
+
+  @typehints.with_input_types(typing.Tuple[K, V])
+  @typehints.with_output_types(typing.List[typing.Tuple[K, V]])
+  class PerKey(PTransform):
+    """
+    PTransform takes PCollection of KV and returns a list based on each key
+    whose single value is list of approximate N-tiles of the input element of
+    the key.
+
+    Args:
+      num_quantiles: number of elements in the resulting quantiles values list.
+      compare: (optional) Comparator function which is an implementation
+        of "a < b" taking at least two arguments (a and b). Which is later
+        converted to key function as Python 3 does not support cmp.
+      key: (optional) Key is  a mapping of elements to a comparable key, 
similar
+        to the key argument of Python's sorting methods.
+      reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+    """
+
+    def __init__(self, num_quantiles, compare=None, key=None, reverse=False):
+      self._num_quantiles = num_quantiles
+      self._compare = compare
+      self._key = key
+      self._reverse = reverse
+
+    def expand(self, pcoll):
+      return pcoll | CombinePerKey(ApproximateQuantilesCombineFn.create(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse))
+
+    def display_data(self):
+      return ApproximateQuantiles._display_data(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse)
+
+
+class _QuantileBuffer(object):
+  """A single buffer in the sense of the referenced algorithm."""
+
+  def __init__(self, elements, level=0, weight=1):
+    self.elements = elements
+    self.level = level
+    self.weight = weight
+
+  def __lt__(self, other):
+    self.elements < other.elements
+
+  def sized_iterator(self):
+
+    class QuantileBufferIterator(object):
+      def __init__(self, elem, weight):
+        self._iter = iter(elem)
+        self.weight = weight
+
+      def __iter__(self):
+        return self
+
+      def __next__(self):
+        value = next(self._iter)
+        return {'value': value, 'weight': self.weight}
+
+      next = __next__  # For Python 2
+
+    return QuantileBufferIterator(self.elements, self.weight)
+
+
+class _QuantileState(object):
+  """
+  Compact summarization of a collection on which quantiles can be estimated.
+  """
+  min_val = None  # Holds smallest item in the list
+  max_val = None  # Holds largest item in the list
+
+  def __init__(self, buffer_size, num_buffers, unbuffered_elements, buffers):
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.buffers = buffers
+
+    # The algorithm requires that the manipulated buffers always be filled to
+    # capacity to perform the collapse operation. This operation can be 
extended
+    # to buffers of varying sizes by introducing the notion of fractional
+    # weights, but it's easier to simply combine the remainders from all shards
+    # into new, full buffers and then take them into account when computing the
+    # final output.
+    self.unbuffered_elements = unbuffered_elements
+
+  def is_empty(self):
+    """Check if the buffered & unbuffered elements are empty or not."""
+    return not self.unbuffered_elements and not self.buffers
+
+
+class ApproximateQuantilesCombineFn(CombineFn):
+  """
+  This combiner gives an idea of the distribution of a collection of values
+  using approximate N-tiles. The output of this combiner is the list of size of
+  the number of quantiles (num_quantiles), containing the input values of the
+  minimum value item of the list, the intermediate values (n-tiles) and the
+  maximum value item of the list, in the sort order provided via comparator
+  (compare or key).
+
+  If there are fewer values to combine than the number of quantile
+  (num_quantiles), then the resulting list will contain all the values being
+  combined, in sorted order.
+
+  If no comparator (compare or key) is provided, then the results are sorted
+  in the natural order.
+
+  To evaluate the quantiles, we use the "New Algorithm" described here:
+
+  [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other
+  Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM SIGMOD,
+  Vol 27, No 2, p 426-435, June 1998.
+  http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
+  &type=pdf
+
+  The default error bound is (1 / N), though in practice the accuracy
+  tends to be much better.
+
+  Args:
+    num_quantiles: Number of quantiles to produce. It is the size of the final
+      output list, including the mininum and maximum value items.
+    buffer_size: The size of the buffers, corresponding to k in the referenced
+      paper.
+    num_buffers: The number of buffers, corresponding to b in the referenced
+      paper.
+    compare: (optional) Comparator function which is an implementation
+        of "a < b" taking two arguments (a and b). Which is later
+        converted to key function as Python 3 does not support `cmp`.
+    key: (optional) Key is a mapping of elements to a comparable key, similar
+      to the key argument of Python's sorting methods.
+    reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+  """
+
+  # For alternating between biasing up and down in the above even weight
+  # collapse operation.
+  _offset_jitter = 0
+
+  # The cost (in time and space) to compute quantiles to a given accuracy is a
+  # function of the total number of elements in the data set. If an estimate is
+  # not known or specified, we use this as an upper bound. If this is too low,
+  # errors may exceed the requested tolerance; if too high, efficiency may be
+  # non-optimal. The impact is logarithmic with respect to this value, so this
+  # default should be fine for most uses.
+  _MAX_NUM_ELEMENTS = 1e9
+  _qs = None  # Refers to the _QuantileState
+
+  # TODO(mszb): For Python 3, remove compare and only keep key.
 
 Review comment:
   Done!
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 295324)
    Time Spent: 8h 20m  (was: 8h 10m)

> ApproximateQuantiles transform for Python SDK
> ---------------------------------------------
>
>                 Key: BEAM-6694
>                 URL: https://issues.apache.org/jira/browse/BEAM-6694
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Shehzaad Nakhoda
>            Priority: Minor
>          Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> Add PTransforms for getting an idea of a PCollection's data distribution 
> using approximate N-tiles (e.g. quartiles, percentiles, etc.), either 
> globally or per-key.
> It should offer the same API as its Java counterpart: 
> https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to