[ 
https://issues.apache.org/jira/browse/BEAM-6694?focusedWorklogId=285429&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-285429
 ]

ASF GitHub Bot logged work on BEAM-6694:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/19 01:29
            Start Date: 31/Jul/19 01:29
    Worklog Time Spent: 10m 
      Work Description: aaltay commented on pull request #9153: [BEAM-6694] 
Added Approximate Quantile Transfrom on Python SDK
URL: https://github.com/apache/beam/pull/9153#discussion_r309004035
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/stats.py
 ##########
 @@ -234,3 +239,419 @@ def extract_output(accumulator):
 
   def display_data(self):
     return {'sample_size': self._sample_size}
+
+
+class ApproximateQuantiles(object):
+  """
+  PTransfrom for getting the idea of data distribution using approximate N-tile
+  (e.g. quartiles, percentiles etc.) either globally or per-key.
+  """
+
+  @staticmethod
+  def _display_data(num_quantiles, compare, key, reverse):
+    return {
+        'num_quantiles': DisplayDataItem(num_quantiles, label="Quantile 
Count"),
+        'compare': DisplayDataItem(compare.__class__,
+                                   label='Record Comparer FN'),
+        'key': DisplayDataItem(key.__class__, label='Record Comparer Key'),
+        'reverse': DisplayDataItem(reverse.__class__, label='Is reversed')
+    }
+
+  @typehints.with_input_types(T)
+  @typehints.with_output_types(typing.List[T])
+  class Globally(PTransform):
+    """
+    PTransform takes PCollection and returns a list whose single value is
+    approximate N-tiles of the input collection globally.
+
+    Args:
+      num_quantiles: number of elements in the resulting quantiles values list.
+      compare: (optional) Comparator function which is an implementation
+        of "a < b" taking at least two arguments (a and b). Which is later
+        converted to key function as Python 3 does not support cmp.
+      key: (optional) Key is  a mapping of elements to a comparable key, 
similar
+        to the key argument of Python's sorting methods.
+      reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+    """
+
+    def __init__(self, num_quantiles, compare=None, key=None, reverse=False):
+      self._num_quantiles = num_quantiles
+      self._compare = compare
+      self._key = key
+      self._reverse = reverse
+
+    def expand(self, pcoll):
+      return pcoll | CombineGlobally(ApproximateQuantilesCombineFn.create(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse))
+
+    def display_data(self):
+      return ApproximateQuantiles._display_data(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse)
+
+  @typehints.with_input_types(typing.Tuple[K, V])
+  @typehints.with_output_types(typing.List[typing.Tuple[K, V]])
+  class PerKey(PTransform):
+    """
+    PTransform takes PCollection of KV and returns a list based on each key
+    whose single value is list of approximate N-tiles of the input element of
+    the key.
+
+    Args:
+      num_quantiles: number of elements in the resulting quantiles values list.
+      compare: (optional) Comparator function which is an implementation
+        of "a < b" taking at least two arguments (a and b). Which is later
+        converted to key function as Python 3 does not support cmp.
+      key: (optional) Key is  a mapping of elements to a comparable key, 
similar
+        to the key argument of Python's sorting methods.
+      reverse: (optional) whether to order things smallest to largest, rather
+        than largest to smallest
+    """
+
+    def __init__(self, num_quantiles, compare=None, key=None, reverse=False):
+      self._num_quantiles = num_quantiles
+      self._compare = compare
+      self._key = key
+      self._reverse = reverse
+
+    def expand(self, pcoll):
+      return pcoll | CombinePerKey(ApproximateQuantilesCombineFn.create(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse))
+
+    def display_data(self):
+      return ApproximateQuantiles._display_data(
+          num_quantiles=self._num_quantiles, compare=self._compare,
+          key=self._key, reverse=self._reverse)
+
+
+class _QuantileBuffer(object):
+  """A single buffer in the sense of the referenced algorithm."""
+
+  def __init__(self, elements, level=0, weight=1):
+    self.elements = elements
+    self.level = level
+    self.weight = weight
+
+  def __lt__(self, other):
+    self.elements < other.elements
+
+  def sized_iterator(self):
+
+    class QuantileBufferIterator(object):
+      def __init__(self, elem, weight):
+        self._iter = iter(elem)
+        self.weight = weight
+
+      def __iter__(self):
+        return self
+
+      def __next__(self):
+        value = next(self._iter)
+        return {'value': value, 'weight': self.weight}
+
+      next = __next__  # For Python 2
+
+    return QuantileBufferIterator(self.elements, self.weight)
+
+
+class _QuantileState(object):
+  """
+  Compact summarization of a collection on which quantiles can be estimated.
+  """
+  min_val = None  # Holds smallest item in the list
+  max_val = None  # Holds largest item in the list
+
+  def __init__(self, buffer_size, num_buffers, unbuffered_elements, buffers):
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.buffers = buffers
+
+    # The algorithm requires that the manipulated buffers always be filled to
+    # capacity to perform the collapse operation. This operation can be 
extended
+    # to buffers of varying sizes by introducing the notion of fractional
+    # weights, but it's easier to simply combine the remainders from all shards
+    # into new, full buffers and then take them into account when computing the
+    # final output.
+    self.unbuffered_elements = unbuffered_elements
+
+  def is_empty(self):
+    """Check if the buffered & unbuffered elements are empty or not."""
+    return not self.unbuffered_elements and not self.buffers
+
+
+class ApproximateQuantilesCombineFn(CombineFn):
 
 Review comment:
   Should this start with an underscore? Do expect people to use it directly?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 285429)
    Time Spent: 4h 20m  (was: 4h 10m)

> ApproximateQuantiles transform for Python SDK
> ---------------------------------------------
>
>                 Key: BEAM-6694
>                 URL: https://issues.apache.org/jira/browse/BEAM-6694
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Shehzaad Nakhoda
>            Priority: Minor
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Add PTransforms for getting an idea of a PCollection's data distribution 
> using approximate N-tiles (e.g. quartiles, percentiles, etc.), either 
> globally or per-key.
> It should offer the same API as its Java counterpart: 
> https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to