This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d2d7d  Cythonize DistributionAccumulator
     new 9e3e9c4  This closes #5028
c4d2d7d is described below

commit c4d2d7d717767b803864d788396e4d3dbd419cff
Author: Boyuan Zhang <boyu...@google.com>
AuthorDate: Wed Apr 4 17:36:04 2018 -0700

    Cythonize DistributionAccumulator
---
 .../runners/dataflow/internal/apiclient.py         |  19 ++-
 .../runners/dataflow/internal/apiclient_test.py    |  24 +++-
 .../tools/distribution_counter_microbenchmark.py   |  12 +-
 sdks/python/apache_beam/transforms/__init__.py     |   6 +
 .../python/apache_beam/transforms/cy_combiners.pxd |  20 ---
 sdks/python/apache_beam/transforms/cy_combiners.py | 105 ++--------------
 .../cy_dataflow_distribution_counter.pxd           |  44 +++++++
 .../cy_dataflow_distribution_counter.pyx           | 135 +++++++++++++++++++++
 ...st.py => dataflow_distribution_counter_test.py} |  49 +++++---
 .../transforms/py_dataflow_distribution_counter.py | 114 +++++++++++++++++
 sdks/python/apache_beam/utils/counters.py          |   5 +-
 sdks/python/generate_pydoc.sh                      |   2 +
 12 files changed, 397 insertions(+), 138 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 9db1cab..54eba06 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -47,6 +47,7 @@ from apache_beam.runners.dataflow.internal.clients import 
dataflow
 from apache_beam.runners.dataflow.internal.dependency import 
get_sdk_name_and_version
 from apache_beam.runners.dataflow.internal.names import PropertyNames
 from apache_beam.transforms import cy_combiners
+from apache_beam.transforms import DataflowDistributionCounter
 from apache_beam.transforms.display import DisplayData
 from apache_beam.utils import retry
 
@@ -738,12 +739,22 @@ def to_split_int(n):
 
 
 def translate_distribution(distribution_update, metric_update_proto):
-  """Translate metrics DistributionUpdate to dataflow distribution update."""
+  """Translate metrics DistributionUpdate to dataflow distribution update.
+
+  Args:
+    distribution_update: Instance of DistributionData or
+    DataflowDistributionCounter.
+    metric_update_proto: Used for report metrics.
+  """
   dist_update_proto = dataflow.DistributionUpdate()
   dist_update_proto.min = to_split_int(distribution_update.min)
   dist_update_proto.max = to_split_int(distribution_update.max)
   dist_update_proto.count = to_split_int(distribution_update.count)
   dist_update_proto.sum = to_split_int(distribution_update.sum)
+  # DatadflowDistributionCounter needs to translate histogram
+  if isinstance(distribution_update, DataflowDistributionCounter):
+    dist_update_proto.histogram = dataflow.Histogram()
+    distribution_update.translate_to_histogram(dist_update_proto.histogram)
   metric_update_proto.distribution = dist_update_proto
 
 
@@ -804,6 +815,9 @@ structured_counter_translations = {
     cy_combiners.AnyCombineFn: (
         dataflow.CounterMetadata.KindValueValuesEnum.OR,
         MetricUpdateTranslators.translate_boolean),
+    cy_combiners.DataflowDistributionCounterFn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+        translate_distribution)
 }
 
 
@@ -841,4 +855,7 @@ counter_translations = {
     cy_combiners.AnyCombineFn: (
         dataflow.NameAndKind.KindValueValuesEnum.OR,
         MetricUpdateTranslators.translate_boolean),
+    cy_combiners.DataflowDistributionCounterFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION,
+        translate_distribution)
 }
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index fc5c21c..35ece3e 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -23,9 +23,10 @@ from apache_beam.metrics.cells import DistributionData
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.dataflow.internal import dependency
 from apache_beam.runners.dataflow.internal.clients import dataflow
+from apache_beam.transforms import DataflowDistributionCounter
 
 # Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
   from apache_beam.runners.dataflow.internal import apiclient
 except ImportError:
@@ -133,6 +134,27 @@ class UtilTest(unittest.TestCase):
     self.assertEqual(metric_update.distribution.count.lowBits,
                      distribution_update.count)
 
+  def test_translate_distribution_counter(self):
+    counter_update = DataflowDistributionCounter()
+    counter_update.add_input(1)
+    counter_update.add_input(3)
+    metric_proto = dataflow.CounterUpdate()
+    apiclient.translate_distribution(counter_update, metric_proto)
+    histogram = mock.Mock(firstBucketOffset=None, bucketCounts=None)
+    counter_update.translate_to_histogram(histogram)
+    self.assertEqual(metric_proto.distribution.min.lowBits,
+                     counter_update.min)
+    self.assertEqual(metric_proto.distribution.max.lowBits,
+                     counter_update.max)
+    self.assertEqual(metric_proto.distribution.sum.lowBits,
+                     counter_update.sum)
+    self.assertEqual(metric_proto.distribution.count.lowBits,
+                     counter_update.count)
+    self.assertEqual(metric_proto.distribution.histogram.bucketCounts,
+                     histogram.bucketCounts)
+    self.assertEqual(metric_proto.distribution.histogram.firstBucketOffset,
+                     histogram.firstBucketOffset)
+
   def test_translate_means(self):
     metric_update = dataflow.CounterUpdate()
     accumulator = mock.Mock()
diff --git 
a/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py 
b/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py
index 2426bc5..3e301eb 100644
--- a/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py
@@ -30,7 +30,6 @@ import sys
 import time
 
 from apache_beam.tools import utils
-from apache_beam.transforms.cy_combiners import DistributionAccumulator
 
 
 def generate_input_values(num_input, lower_bound, upper_bound):
@@ -47,15 +46,15 @@ def run_benchmark(num_runs=100, num_input=10000, 
seed=time.time()):
   lower_bound = 0
   upper_bound = sys.maxint
   inputs = generate_input_values(num_input, lower_bound, upper_bound)
-  print ("Number of runs:", num_runs)
+  from apache_beam.transforms import DataflowDistributionCounter
+  print("Number of runs:", num_runs)
   print("Input size:", num_input)
   print("Input sequence from %d to %d" % (lower_bound, upper_bound))
   print("Random seed:", seed)
   for i in range(num_runs):
-    counter = DistributionAccumulator()
+    counter = DataflowDistributionCounter()
     start = time.time()
-    for value in inputs:
-      counter.add_input(value)
+    counter.add_inputs_for_test(inputs)
     time_cost = time.time() - start
     print("Run %d: Total time cost %g sec" % (i+1, time_cost))
     total_time += time_cost/num_input
@@ -63,5 +62,6 @@ def run_benchmark(num_runs=100, num_input=10000, 
seed=time.time()):
 
 
 if __name__ == '__main__':
-  utils.check_compiled('apache_beam.transforms.cy_combiners')
+  utils.check_compiled(
+      'apache_beam.transforms.cy_dataflow_distribution_counter')
   run_benchmark()
diff --git a/sdks/python/apache_beam/transforms/__init__.py 
b/sdks/python/apache_beam/transforms/__init__.py
index b77b0f6..3c04b37 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -23,3 +23,9 @@ from apache_beam.transforms.core import *
 from apache_beam.transforms.ptransform import *
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.util import *
+
+# No backwards compatibility guarantees.
+try:
+  from apache_beam.transforms.cy_dataflow_distribution_counter import 
DataflowDistributionCounter
+except ImportError:
+  from apache_beam.transforms.py_dataflow_distribution_counter import 
DataflowDistributionCounter
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.pxd 
b/sdks/python/apache_beam/transforms/cy_combiners.pxd
index 0e6fe0c..4fc03a7 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.pxd
+++ b/sdks/python/apache_beam/transforms/cy_combiners.pxd
@@ -90,23 +90,3 @@ cdef class AnyAccumulator(object):
   cpdef add_input(self, bint element)
   @cython.locals(accumulator=AnyAccumulator)
   cpdef merge(self, accumulators)
-
-cdef bint compare_to(int64_t x, int64_t y)
-
-@cython.locals(number_of_leading_zeros=int64_t, y=int64_t)
-cdef int64_t get_log10_round_to_floor(int64_t element)
-
-cdef class DistributionAccumulator(object):
-  cdef public int64_t min
-  cdef public int64_t max
-  cdef public int64_t count
-  cdef public int64_t sum
-  cdef public int64_t first_bucket_offset
-  cdef public list buckets
-  cdef public int64_t buckets_per_10
-  @cython.locals(bucket_index = int64_t, size_of_bucket=int64_t)
-  cpdef add_input(self, int64_t element)
-  @cython.locals(log10_floor=int64_t, power_of_ten=int64_t,
-                 bucket_offset=int64_t)
-  cpdef int64_t calculate_bucket_index(self, int64_t element)
-  cdef void increment_bucket(self, int64_t bucket_index)
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py 
b/sdks/python/apache_beam/transforms/cy_combiners.py
index b0f7e7b..53a440e 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -22,10 +22,13 @@ For internal use only; no backwards-compatibility 
guarantees.
 
 from __future__ import absolute_import
 
-import math
-
 from apache_beam.transforms import core
 
+try:
+  from apache_beam.transforms.cy_dataflow_distribution_counter import 
DataflowDistributionCounter
+except ImportError:
+  from apache_beam.transforms.py_dataflow_distribution_counter import 
DataflowDistributionCounter
+
 
 class AccumulatorCombineFn(core.CombineFn):
   # singleton?
@@ -312,96 +315,14 @@ class AllCombineFn(AccumulatorCombineFn):
   _accumulator_type = AllAccumulator
 
 
-MAX_LONG_10_FOR_LEADING_ZEROS = [19, 18, 18, 18, 18, 17, 17, 17, 16, 16, 16, 
15,
-                                 15, 15, 15, 14, 14, 14, 13, 13, 13, 12, 12, 
12,
-                                 12, 11, 11, 11, 10, 10, 10, 9, 9, 9, 9, 8, 8,
-                                 8, 7, 7, 7, 6, 6, 6, 6, 5, 5, 5, 4, 4, 4, 3, 
3,
-                                 3, 3, 2, 2, 2, 1, 1, 1, 0, 0, 0]
-
-LONG_SIZE = 64
-
-
-def compare_to(x, y):
-  """return the sign bit of x-y"""
-  if x < y:
-    return 1
-  return 0
-
-
-def get_log10_round_to_floor(element):
-  number_of_leading_zeros = LONG_SIZE - element.bit_length()
-  y = MAX_LONG_10_FOR_LEADING_ZEROS[number_of_leading_zeros]
-  return y - compare_to(element, math.pow(10, y))
+class DataflowDistributionCounterFn(AccumulatorCombineFn):
+  """A subclass of cy_combiners.AccumulatorCombineFn.
 
+  Make DataflowDistributionCounter able to report to Dataflow service via
+  CounterFactory.
 
-class DistributionAccumulator(object):
-  """Distribution Counter:
-  contains value distribution statistics and methods for incrementing
+  When cythonized DataflowDistributinoCounter available, make
+  CounterFn combine with cythonized module, otherwise, combine with python
+  version.
   """
-  def __init__(self):
-    global INT64_MAX # pylint: disable=global-variable-not-assigned
-    self.min = INT64_MAX
-    self.max = 0
-    self.count = 0
-    self.sum = 0
-    """Histogram buckets of value counts for a distribution(1,2,5 bucketing)"""
-    self.buckets = []
-    """Starting index of the first stored bucket"""
-    self.first_bucket_offset = 0
-    """There are 3 buckets for every power of ten: 1, 2, 5"""
-    self.buckets_per_10 = 3
-
-  def add_input(self, element):
-    if element < 0:
-      raise ValueError('Distribution counters support only non-negative value')
-    self.min = min(self.min, element)
-    self.max = max(self.max, element)
-    self.count += 1
-    self.sum += element
-    bucket_index = self.calculate_bucket_index(element)
-    size_of_bucket = len(self.buckets)
-    self.increment_bucket(bucket_index)
-    if size_of_bucket == 0:
-      self.first_bucket_offset = bucket_index
-    else:
-      self.first_bucket_offset = min(self.first_bucket_offset, bucket_index)
-
-  def calculate_bucket_index(self, element):
-    """Calculate the bucket index for the given element"""
-    if element == 0:
-      return 0
-    log10_floor = get_log10_round_to_floor(element)
-    power_of_ten = math.pow(10, log10_floor)
-    if element < 2 * power_of_ten:
-      bucket_offset = 0  # [0, 2)
-    elif element < 5 * power_of_ten:
-      bucket_offset = 1  # [2, 5)
-    else:
-      bucket_offset = 2  # [5, 10)
-    return 1 + (log10_floor * self.buckets_per_10) + bucket_offset
-
-  def increment_bucket(self, bucket_index):
-    """Increment the bucket for the given index
-    If the bucket at the given index is already in the list,
-    this will increment the existing value.
-    If the specified index is outside of the current bucket range,
-    the bucket list will be extended to incorporate the new bucket
-    """
-    if not self.buckets:
-      self.buckets.append(1)
-    elif bucket_index < self.first_bucket_offset:
-      new_buckets = []
-      new_buckets.append(1)
-      new_buckets.extend(
-          [0] * (self.first_bucket_offset - bucket_index - 1))
-      self.buckets = new_buckets + self.buckets
-    elif bucket_index >= self.first_bucket_offset + len(self.buckets):
-      self.buckets.extend(
-          [0] * (bucket_index - self.first_bucket_offset - len(self.buckets)))
-      self.buckets.append(1)
-    else:
-      self.buckets[bucket_index - self.first_bucket_offset] += 1
-
-
-class DistributionCounterFn(AccumulatorCombineFn):
-  _accumulator_type = DistributionAccumulator
+  _accumulator_type = DataflowDistributionCounter
diff --git 
a/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd 
b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd
new file mode 100644
index 0000000..d4711bd
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# cython: profile=True
+
+""" For internal use only. No backwards compatibility guarantees."""
+
+cimport cython
+from libc.stdint cimport int64_t
+
+
+# 3 buckets for every power of ten -> 1, 2, 5
+cdef enum:
+  BUCKET_PER_TEN = 3
+
+# Assume the max input is max(int64_t), then the possible max bucket size is 59
+cdef enum:
+  MAX_BUCKET_SIZE = 59
+
+cdef class DataflowDistributionCounter(object):
+  cdef public int64_t min
+  cdef public int64_t max
+  cdef public int64_t count
+  cdef public int64_t sum
+  cdef int64_t* buckets
+  cdef public bint is_cythonized
+  cpdef bint add_input(self, int64_t element) except -1
+  cdef int64_t _fast_calculate_bucket_index(self, int64_t element)
+  cpdef void translate_to_histogram(self, histogram)
+  cpdef bint add_inputs_for_test(self, elements) except -1
+  cpdef int64_t calculate_bucket_index(self, int64_t element)
diff --git 
a/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx 
b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx
new file mode 100644
index 0000000..a3622aa
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# cython: profile=True
+
+""" For internal use only. No backwards compatibility guarantees."""
+
+cimport cython
+from libc.stdint cimport int64_t, INT64_MAX
+from libc.stdlib cimport calloc, free
+
+
+cdef unsigned long long* POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 
10e5,
+                                      10e6, 10e7, 10e8, 10e9, 10e10, 10e11,
+                                      10e12, 10e13, 10e14, 10e15, 10e16, 10e17,
+                                      10e18]
+
+
+cdef int64_t get_log10_round_to_floor(int64_t element):
+  cdef int power = 0
+  while element >= POWER_TEN[power]:
+    power += 1
+  return power - 1
+
+
+cdef class DataflowDistributionCounter(object):
+  """Distribution Counter:
+
+  Contains value distribution statistics and methods for incrementing.
+
+  Currently using special bucketing strategy suitable for Dataflow
+
+  Attributes:
+    min: minimum value of all inputs.
+    max: maximum value of all inputs.
+    count: total count of all inputs.
+    sum: sum of all inputs.
+    buckets: histogram buckets of value counts for a
+    distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as 
input).
+    is_cythonized: mark whether DataflowDistributionCounter cythonized.
+  """
+  def __init__(self):
+    self.min = INT64_MAX
+    self.max = 0
+    self.count = 0
+    self.sum = 0
+    self.buckets = <int64_t*> calloc(MAX_BUCKET_SIZE, sizeof(int64_t))
+    self.is_cythonized = True
+
+  def __dealloc__(self):
+    """free allocated memory"""
+    free(self.buckets)
+
+  cpdef bint add_input(self, int64_t element) except -1:
+    if element < 0:
+      raise ValueError('Distribution counters support only non-negative value')
+    self.min = min(self.min, element)
+    self.max = max(self.max, element)
+    self.count += 1
+    self.sum += element
+    cdef int64_t bucket_index = self._fast_calculate_bucket_index(element)
+    self.buckets[bucket_index] += 1
+
+  cdef int64_t _fast_calculate_bucket_index(self, int64_t element):
+    """Calculate the bucket index for the given element.
+    
+    Declare calculate_bucket_index as cdef in order to improve performance, 
+    since cpdef will have significant overhead.    
+    """
+    if element == 0:
+      return 0
+    cdef int64_t log10_floor = get_log10_round_to_floor(element)
+    cdef int64_t power_of_ten = POWER_TEN[log10_floor]
+    cdef int64_t bucket_offset = 0
+    if element < power_of_ten * 2:
+      bucket_offset = 0
+    elif element < power_of_ten * 5:
+      bucket_offset = 1
+    else:
+      bucket_offset = 2
+    return 1 + log10_floor * BUCKET_PER_TEN + bucket_offset
+
+  cpdef void translate_to_histogram(self, histogram):
+    """Translate buckets into Histogram.
+    
+    Args:
+      histogram: 
apache_beam.runners.dataflow.internal.clents.dataflow.Histogram
+      Ideally, only call this function when reporting counter to 
+      dataflow service.
+    """
+    cdef int first_bucket_offset = 0
+    cdef int last_bucket_offset = 0
+    cdef int index = 0
+    for index in range(0, MAX_BUCKET_SIZE):
+      if self.buckets[index] != 0:
+        first_bucket_offset = index
+        break
+    for index in range(MAX_BUCKET_SIZE - 1, -1, -1):
+      if self.buckets[index] != 0:
+        last_bucket_offset = index
+        break
+    histogram.firstBucketOffset = first_bucket_offset
+    histogram.bucketCounts = []
+    for index in range(first_bucket_offset, last_bucket_offset + 1):
+      histogram.bucketCounts.append(self.buckets[index])
+
+  cpdef bint add_inputs_for_test(self, elements) except -1:
+    """Used for performance microbenchmark.
+    
+    During runtime, add_input will be called through c-call, so we want to have
+    the same calling routine when running microbenchmark as application 
runtime.
+    Directly calling cpdef from def will cause significant overhead.
+    """
+    for element in elements:
+      self.add_input(element)
+
+  cpdef int64_t calculate_bucket_index(self, int64_t element):
+    """Used for unit tests.
+    
+    cdef calculate_bucket_index cannot be called directly from def.
+    """
+    return self._fast_calculate_bucket_index(element)
diff --git a/sdks/python/apache_beam/transforms/cy_combiners_test.py 
b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
similarity index 50%
rename from sdks/python/apache_beam/transforms/cy_combiners_test.py
rename to 
sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
index f9ccfb0..e19eee6 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners_test.py
+++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
@@ -9,24 +9,28 @@
 # limitations under the License.
 #
 
-'''
-Unit tests for the Distribution Counter
-'''
+"""Unit tests for DataflowDistributionCounter
+When Cython is available, unit tests will test on cythonized module,
+otherwise, test on pure python module
+"""
+
 import math
+import sys
 import unittest
 
-from apache_beam.transforms.cy_combiners import DistributionAccumulator
+from mock import Mock
 
+from apache_beam.transforms import DataflowDistributionCounter
 
-class DistributionAccumulatorTest(unittest.TestCase):
 
+class DataflowDistributionAccumulatorTest(unittest.TestCase):
   def test_calculate_bucket_index_with_input_0(self):
-    counter = DistributionAccumulator()
+    counter = DataflowDistributionCounter()
     index = counter.calculate_bucket_index(0)
     self.assertEquals(index, 0)
 
   def test_calculate_bucket_index_within_max_long(self):
-    counter = DistributionAccumulator()
+    counter = DataflowDistributionCounter()
     bucket = 1
     power_of_ten = 1
     INT64_MAX = math.pow(2, 63) - 1
@@ -39,26 +43,39 @@ class DistributionAccumulatorTest(unittest.TestCase):
       power_of_ten *= 10
 
   def test_add_input(self):
-    counter = DistributionAccumulator()
+    counter = DataflowDistributionCounter()
     expected_buckets = [1, 3, 0, 0, 0, 0, 0, 0, 1, 1]
     expected_sum = 1510
     expected_first_bucket_index = 1
     expected_count = 6
     expected_min = 1
     expected_max = 1000
-    for value in [1, 500, 2, 3, 1000, 4]:
-      counter.add_input(value)
-    self.assertEquals(counter.buckets, expected_buckets)
+    for element in [1, 500, 2, 3, 1000, 4]:
+      counter.add_input(element)
+    histogram = Mock(firstBucketOffset=None, bucketCounts=None)
+    counter.translate_to_histogram(histogram)
     self.assertEquals(counter.sum, expected_sum)
-    self.assertEquals(counter.first_bucket_offset, expected_first_bucket_index)
     self.assertEquals(counter.count, expected_count)
     self.assertEquals(counter.min, expected_min)
     self.assertEquals(counter.max, expected_max)
+    self.assertEquals(histogram.firstBucketOffset, expected_first_bucket_index)
+    self.assertEquals(histogram.bucketCounts, expected_buckets)
+
+  def test_translate_to_histogram_with_input_0(self):
+    counter = DataflowDistributionCounter()
+    counter.add_input(0)
+    histogram = Mock(firstBucketOffset=None, bucketCounts=None)
+    counter.translate_to_histogram(histogram)
+    self.assertEquals(histogram.firstBucketOffset, 0)
+    self.assertEquals(histogram.bucketCounts, [1])
 
-  def test_add_input_with_invalid_input(self):
-    counter = DistributionAccumulator()
-    with self.assertRaises(ValueError):
-      counter.add_input(-1)
+  def test_translate_to_histogram_with_max_input(self):
+    counter = DataflowDistributionCounter()
+    counter.add_input(sys.maxint)
+    histogram = Mock(firstBucketOffset=None, bucketCounts=None)
+    counter.translate_to_histogram(histogram)
+    self.assertEquals(histogram.firstBucketOffset, 57)
+    self.assertEquals(histogram.bucketCounts, [1])
 
 
 if __name__ == '__main__':
diff --git 
a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py 
b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
new file mode 100644
index 0000000..ee5a099
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""For internal use only; no backwards-compatibility guarantees."""
+
+
+globals()['INT64_MAX'] = 2**63 - 1
+globals()['INT64_MIN'] = -2**63
+
+POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 10e5,
+             10e6, 10e7, 10e8, 10e9, 10e10, 10e11,
+             10e12, 10e13, 10e14, 10e15, 10e16, 10e17,
+             10e18]
+
+
+def get_log10_round_to_floor(element):
+  power = 0
+  while element >= POWER_TEN[power]:
+    power += 1
+  return power - 1
+
+
+class DataflowDistributionCounter(object):
+  """Pure python DataflowDistributionCounter in case Cython not available.
+
+
+  Please avoid using python mode if possible, since it's super slow
+  Cythonized DatadflowDistributionCounter defined in
+  apache_beam.transforms.cy_dataflow_distribution_counter.
+
+  Currently using special bucketing strategy suitable for Dataflow
+
+  Attributes:
+    min: minimum value of all inputs.
+    max: maximum value of all inputs.
+    count: total count of all inputs.
+    sum: sum of all inputs.
+    buckets: histogram buckets of value counts for a
+    distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as 
input).
+    is_cythonized: mark whether DataflowDistributionCounter cythonized.
+  """
+  # Assume the max input is sys.maxint, then the possible max bucket size is 59
+  MAX_BUCKET_SIZE = 59
+
+  # 3 buckets for every power of ten -> 1, 2, 5
+  BUCKET_PER_TEN = 3
+
+  def __init__(self):
+    self.min = INT64_MAX
+    self.max = 0
+    self.count = 0
+    self.sum = 0
+    self.buckets = [0] * self.MAX_BUCKET_SIZE
+    self.is_cythonized = False
+
+  def add_input(self, element):
+    if element < 0:
+      raise ValueError('Distribution counters support only non-negative value')
+    self.min = min(self.min, element)
+    self.max = max(self.max, element)
+    self.count += 1
+    self.sum += element
+    bucket_index = self.calculate_bucket_index(element)
+    self.buckets[bucket_index] += 1
+
+  def calculate_bucket_index(self, element):
+    """Calculate the bucket index for the given element."""
+    if element == 0:
+      return 0
+    log10_floor = get_log10_round_to_floor(element)
+    power_of_ten = POWER_TEN[log10_floor]
+    if element < power_of_ten * 2:
+      bucket_offset = 0
+    elif element < power_of_ten * 5:
+      bucket_offset = 1
+    else:
+      bucket_offset = 2
+    return 1 + log10_floor * self.BUCKET_PER_TEN + bucket_offset
+
+  def translate_to_histogram(self, histogram):
+    """Translate buckets into Histogram.
+
+    Args:
+      histogram: 
apache_beam.runners.dataflow.internal.clents.dataflow.Histogram
+      Ideally, only call this function when reporting counter to
+      dataflow service.
+    """
+    first_bucket_offset = 0
+    last_bucket_offset = 0
+    for index in range(0, self.MAX_BUCKET_SIZE):
+      if self.buckets[index] != 0:
+        first_bucket_offset = index
+        break
+    for index in range(self.MAX_BUCKET_SIZE - 1, -1, -1):
+      if self.buckets[index] != 0:
+        last_bucket_offset = index
+        break
+    histogram.firstBucketOffset = first_bucket_offset
+    histogram.bucketCounts = (
+        self.buckets[first_bucket_offset:last_bucket_offset + 1])
diff --git a/sdks/python/apache_beam/utils/counters.py 
b/sdks/python/apache_beam/utils/counters.py
index 2475ef1..46ac8ff 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -131,8 +131,9 @@ class Counter(object):
   SUM = cy_combiners.SumInt64Fn()
   MEAN = cy_combiners.MeanInt64Fn()
 
-  # Distribution Accumulator Fn
-  DISTRIBUTION = cy_combiners.DistributionCounterFn()
+  # Dataflow Distribution Accumulator Fn.
+  # TODO(BEAM-4045): Generalize distribution counter if necessary.
+  DATAFLOW_DISTRIBUTION = cy_combiners.DataflowDistributionCounterFn()
 
   def __init__(self, name, combine_fn):
     """Creates a Counter object.
diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh
index 54795e2..ae8d043 100755
--- a/sdks/python/generate_pydoc.sh
+++ b/sdks/python/generate_pydoc.sh
@@ -54,6 +54,8 @@ excluded_patterns=(
     apache_beam/runners/worker/
     apache_beam/tools/map_fn_microbenchmark.*
     apache_beam/transforms/cy_combiners.*
+    apache_beam/transforms/cy_dataflow_distribution_counter.*
+    apache_beam/transforms/py_dataflow_distribution_counter.*
     apache_beam/utils/counters.*
     apache_beam/utils/windowed_value.*
     *_pb2.py

-- 
To stop receiving notification emails like this one, please contact
al...@apache.org.

Reply via email to