[ 
https://issues.apache.org/jira/browse/BEAM-6693?focusedWorklogId=242848&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242848
 ]

ASF GitHub Bot logged work on BEAM-6693:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/May/19 20:49
            Start Date: 15/May/19 20:49
    Worklog Time Spent: 10m 
      Work Description: Hannah-Jiang commented on pull request #8535: 
[BEAM-6693] ApproximateUnique transform for Python SDK
URL: https://github.com/apache/beam/pull/8535#discussion_r284444852
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/stats.py
 ##########
 @@ -0,0 +1,206 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import heapq
+import logging
+import math
+import sys
+from builtins import round
+
+from apache_beam.transforms.core import *
+from apache_beam.transforms.ptransform import PTransform
+
+try:
+  import mmh3
+except ImportError:
+  logging.info('Python version >=3.0 uses buildin hash function.')
+
+__all__ = [
+    'ApproximateUniqueGlobally',
+    'ApproximateUniquePerKey',
+]
+
+
+class ApproximateUniqueGlobally(PTransform):
+  """
+  Hashes input elements and uses those to extrapolate the size of the entire
+  set of hash values by assuming the rest of the hash values are as densely
+  distributed as the sample space.
+
+  Args:
+    **kwargs: Accepts a single named argument "size" or "error".
+    size: an int not smaller than 16, which we would use to estimate
+      number of unique values.
+    error: max estimation error, which is a float between 0.01
+      and 0.50. If error is given, size will be calculated from error with
+      _get_sample_size_from_est_error function.
+  """
+
+  _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.'
+  _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \
+                         'Received {size = %s, error = %s}.'
+  _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \
+                        '<= 0.50. In general, the estimation error is about ' \
+                        '2 / sqrt(sample_size). Received {size = %s}.'
+  _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \
+                         'between 0.01 and 0.50. Received {error = %s}.'
+
+  def __init__(self, **kwargs):
+    input_size = kwargs.pop('size', None)
+    input_err = kwargs.pop('error', None)
+
+    if None not in (input_size, input_err):
+      raise ValueError(self._MULTI_VALUE_ERR_MSG % (input_size, input_err))
+    elif input_size is None and input_err is None:
+      raise ValueError(self._NO_VALUE_ERR_MSG)
+    elif input_size is not None:
+      if not isinstance(input_size, int) or input_size < 16:
+        raise ValueError(self._INPUT_SIZE_ERR_MSG % (input_size))
+      else:
+        self._sample_size = input_size
+        self._max_est_err = None
+    else:
+      if input_err < 0.01 or input_err > 0.5:
+        raise ValueError(self._INPUT_ERROR_ERR_MSG % (input_err))
+      else:
+        self._sample_size = self._get_sample_size_from_est_error(input_err)
+        self._max_est_err = input_err
+
+  def expand(self, pcoll):
+    return pcoll \
+           | 'CountGlobalUniqueValues' \
+           >> 
(CombineGlobally(ApproximateUniqueCombineDoFn(self._sample_size)))
+
+  @staticmethod
+  def _get_sample_size_from_est_error(est_err):
+    """
+    :return: sample size
+
+    Calculate sample size from estimation error
+    """
+    return int(math.ceil(4.0 / math.pow(est_err, 2.0)))
+
+
+class ApproximateUniquePerKey(ApproximateUniqueGlobally):
+
+  def expand(self, pcoll):
+    return pcoll \
+           | 'CountPerKeyUniqueValues' \
+           >> (CombinePerKey(ApproximateUniqueCombineDoFn(self._sample_size)))
+
+
+class _LargestUnique(object):
+  """
+  An object to keep samples and calculate sample hash space. It is an
+  accumulator of a combine function.
+  """
+  _HASH_SPACE_SIZE = 2.0 * sys.maxsize
+
+  def __init__(self, sample_size):
+    self._sample_size = sample_size
+    self._min_hash = sys.maxsize
+    self._sample_heap = []
+    self._sample_set = set()
+
+  def add(self, element):
+    """
+    :param an element from pcoll.
+    :return: boolean type whether the value is in the heap
+
+    Adds a value to the heap, returning whether the value is (large enough to
+    be) in the heap.
+    """
+    if len(self._sample_heap) >= self._sample_size and element < 
self._min_hash:
+      return False
+
+    if element not in self._sample_set:
+      self._sample_set.add(element)
+      heapq.heappush(self._sample_heap, element)
+
+      if len(self._sample_heap) > self._sample_size:
+        temp = heapq.heappop(self._sample_heap)
+        self._sample_set.remove(temp)
+        self._min_hash = self._sample_heap[0]
+      elif element < self._min_hash:
+        self._min_hash = element
+
+    return True
+
+  def get_estimate(self):
+    """
+    :return: estimation of unique values
+
+    If heap size is smaller than sample size, just return heap size.
+    Otherwise, takes into account the possibility of hash collisions,
+    which become more likely than not for 2^32 distinct elements.
+    Note that log(1+x) ~ x for small x, so for sampleSize << maxHash
+    log(1 - sampleSize/sampleSpace) / log(1 - 1/sampleSpace) ~ sampleSize
+    and hence estimate ~ sampleSize * HASH_SPACE_SIZE / sampleSpace
+    as one would expect.
+    """
+
+    if len(self._sample_heap) < self._sample_size:
+      return len(self._sample_heap)
+    else:
+      sample_space_size = sys.maxsize - 1.0 * self._min_hash
+      est = math.log1p(-self._sample_size/ sample_space_size) \
 
 Review comment:
   I was not able to find a document, instead I added more explanation about 
the estimate equation.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 242848)
    Time Spent: 5.5h  (was: 5h 20m)

> ApproximateUnique transform for Python SDK
> ------------------------------------------
>
>                 Key: BEAM-6693
>                 URL: https://issues.apache.org/jira/browse/BEAM-6693
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Hannah Jiang
>            Priority: Minor
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Add a PTransform for estimating the number of distinct elements in a 
> PCollection and the number of distinct values associated with each key in a 
> PCollection KVs.
> it should offer the same API as its Java counterpart: 
> https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to