This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c4d2d7d Cythonize DistributionAccumulator
new 9e3e9c4 This closes #5028
c4d2d7d is described below
commit c4d2d7d717767b803864d788396e4d3dbd419cff
Author: Boyuan Zhang <[email protected]>
AuthorDate: Wed Apr 4 17:36:04 2018 -0700
Cythonize DistributionAccumulator
---
.../runners/dataflow/internal/apiclient.py | 19 ++-
.../runners/dataflow/internal/apiclient_test.py | 24 +++-
.../tools/distribution_counter_microbenchmark.py | 12 +-
sdks/python/apache_beam/transforms/__init__.py | 6 +
.../python/apache_beam/transforms/cy_combiners.pxd | 20 ---
sdks/python/apache_beam/transforms/cy_combiners.py | 105 ++--------------
.../cy_dataflow_distribution_counter.pxd | 44 +++++++
.../cy_dataflow_distribution_counter.pyx | 135 +++++++++++++++++++++
...st.py => dataflow_distribution_counter_test.py} | 49 +++++---
.../transforms/py_dataflow_distribution_counter.py | 114 +++++++++++++++++
sdks/python/apache_beam/utils/counters.py | 5 +-
sdks/python/generate_pydoc.sh | 2 +
12 files changed, 397 insertions(+), 138 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 9db1cab..54eba06 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -47,6 +47,7 @@ from apache_beam.runners.dataflow.internal.clients import
dataflow
from apache_beam.runners.dataflow.internal.dependency import
get_sdk_name_and_version
from apache_beam.runners.dataflow.internal.names import PropertyNames
from apache_beam.transforms import cy_combiners
+from apache_beam.transforms import DataflowDistributionCounter
from apache_beam.transforms.display import DisplayData
from apache_beam.utils import retry
@@ -738,12 +739,22 @@ def to_split_int(n):
def translate_distribution(distribution_update, metric_update_proto):
- """Translate metrics DistributionUpdate to dataflow distribution update."""
+ """Translate metrics DistributionUpdate to dataflow distribution update.
+
+ Args:
+ distribution_update: Instance of DistributionData or
+ DataflowDistributionCounter.
+ metric_update_proto: Used for report metrics.
+ """
dist_update_proto = dataflow.DistributionUpdate()
dist_update_proto.min = to_split_int(distribution_update.min)
dist_update_proto.max = to_split_int(distribution_update.max)
dist_update_proto.count = to_split_int(distribution_update.count)
dist_update_proto.sum = to_split_int(distribution_update.sum)
+ # DatadflowDistributionCounter needs to translate histogram
+ if isinstance(distribution_update, DataflowDistributionCounter):
+ dist_update_proto.histogram = dataflow.Histogram()
+ distribution_update.translate_to_histogram(dist_update_proto.histogram)
metric_update_proto.distribution = dist_update_proto
@@ -804,6 +815,9 @@ structured_counter_translations = {
cy_combiners.AnyCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.OR,
MetricUpdateTranslators.translate_boolean),
+ cy_combiners.DataflowDistributionCounterFn: (
+ dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+ translate_distribution)
}
@@ -841,4 +855,7 @@ counter_translations = {
cy_combiners.AnyCombineFn: (
dataflow.NameAndKind.KindValueValuesEnum.OR,
MetricUpdateTranslators.translate_boolean),
+ cy_combiners.DataflowDistributionCounterFn: (
+ dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION,
+ translate_distribution)
}
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index fc5c21c..35ece3e 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -23,9 +23,10 @@ from apache_beam.metrics.cells import DistributionData
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.dataflow.internal import dependency
from apache_beam.runners.dataflow.internal.clients import dataflow
+from apache_beam.transforms import DataflowDistributionCounter
# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from apache_beam.runners.dataflow.internal import apiclient
except ImportError:
@@ -133,6 +134,27 @@ class UtilTest(unittest.TestCase):
self.assertEqual(metric_update.distribution.count.lowBits,
distribution_update.count)
+ def test_translate_distribution_counter(self):
+ counter_update = DataflowDistributionCounter()
+ counter_update.add_input(1)
+ counter_update.add_input(3)
+ metric_proto = dataflow.CounterUpdate()
+ apiclient.translate_distribution(counter_update, metric_proto)
+ histogram = mock.Mock(firstBucketOffset=None, bucketCounts=None)
+ counter_update.translate_to_histogram(histogram)
+ self.assertEqual(metric_proto.distribution.min.lowBits,
+ counter_update.min)
+ self.assertEqual(metric_proto.distribution.max.lowBits,
+ counter_update.max)
+ self.assertEqual(metric_proto.distribution.sum.lowBits,
+ counter_update.sum)
+ self.assertEqual(metric_proto.distribution.count.lowBits,
+ counter_update.count)
+ self.assertEqual(metric_proto.distribution.histogram.bucketCounts,
+ histogram.bucketCounts)
+ self.assertEqual(metric_proto.distribution.histogram.firstBucketOffset,
+ histogram.firstBucketOffset)
+
def test_translate_means(self):
metric_update = dataflow.CounterUpdate()
accumulator = mock.Mock()
diff --git
a/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py
b/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py
index 2426bc5..3e301eb 100644
--- a/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py
@@ -30,7 +30,6 @@ import sys
import time
from apache_beam.tools import utils
-from apache_beam.transforms.cy_combiners import DistributionAccumulator
def generate_input_values(num_input, lower_bound, upper_bound):
@@ -47,15 +46,15 @@ def run_benchmark(num_runs=100, num_input=10000,
seed=time.time()):
lower_bound = 0
upper_bound = sys.maxint
inputs = generate_input_values(num_input, lower_bound, upper_bound)
- print ("Number of runs:", num_runs)
+ from apache_beam.transforms import DataflowDistributionCounter
+ print("Number of runs:", num_runs)
print("Input size:", num_input)
print("Input sequence from %d to %d" % (lower_bound, upper_bound))
print("Random seed:", seed)
for i in range(num_runs):
- counter = DistributionAccumulator()
+ counter = DataflowDistributionCounter()
start = time.time()
- for value in inputs:
- counter.add_input(value)
+ counter.add_inputs_for_test(inputs)
time_cost = time.time() - start
print("Run %d: Total time cost %g sec" % (i+1, time_cost))
total_time += time_cost/num_input
@@ -63,5 +62,6 @@ def run_benchmark(num_runs=100, num_input=10000,
seed=time.time()):
if __name__ == '__main__':
- utils.check_compiled('apache_beam.transforms.cy_combiners')
+ utils.check_compiled(
+ 'apache_beam.transforms.cy_dataflow_distribution_counter')
run_benchmark()
diff --git a/sdks/python/apache_beam/transforms/__init__.py
b/sdks/python/apache_beam/transforms/__init__.py
index b77b0f6..3c04b37 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -23,3 +23,9 @@ from apache_beam.transforms.core import *
from apache_beam.transforms.ptransform import *
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.util import *
+
+# No backwards compatibility guarantees.
+try:
+ from apache_beam.transforms.cy_dataflow_distribution_counter import
DataflowDistributionCounter
+except ImportError:
+ from apache_beam.transforms.py_dataflow_distribution_counter import
DataflowDistributionCounter
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.pxd
b/sdks/python/apache_beam/transforms/cy_combiners.pxd
index 0e6fe0c..4fc03a7 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.pxd
+++ b/sdks/python/apache_beam/transforms/cy_combiners.pxd
@@ -90,23 +90,3 @@ cdef class AnyAccumulator(object):
cpdef add_input(self, bint element)
@cython.locals(accumulator=AnyAccumulator)
cpdef merge(self, accumulators)
-
-cdef bint compare_to(int64_t x, int64_t y)
-
[email protected](number_of_leading_zeros=int64_t, y=int64_t)
-cdef int64_t get_log10_round_to_floor(int64_t element)
-
-cdef class DistributionAccumulator(object):
- cdef public int64_t min
- cdef public int64_t max
- cdef public int64_t count
- cdef public int64_t sum
- cdef public int64_t first_bucket_offset
- cdef public list buckets
- cdef public int64_t buckets_per_10
- @cython.locals(bucket_index = int64_t, size_of_bucket=int64_t)
- cpdef add_input(self, int64_t element)
- @cython.locals(log10_floor=int64_t, power_of_ten=int64_t,
- bucket_offset=int64_t)
- cpdef int64_t calculate_bucket_index(self, int64_t element)
- cdef void increment_bucket(self, int64_t bucket_index)
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py
b/sdks/python/apache_beam/transforms/cy_combiners.py
index b0f7e7b..53a440e 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -22,10 +22,13 @@ For internal use only; no backwards-compatibility
guarantees.
from __future__ import absolute_import
-import math
-
from apache_beam.transforms import core
+try:
+ from apache_beam.transforms.cy_dataflow_distribution_counter import
DataflowDistributionCounter
+except ImportError:
+ from apache_beam.transforms.py_dataflow_distribution_counter import
DataflowDistributionCounter
+
class AccumulatorCombineFn(core.CombineFn):
# singleton?
@@ -312,96 +315,14 @@ class AllCombineFn(AccumulatorCombineFn):
_accumulator_type = AllAccumulator
-MAX_LONG_10_FOR_LEADING_ZEROS = [19, 18, 18, 18, 18, 17, 17, 17, 16, 16, 16,
15,
- 15, 15, 15, 14, 14, 14, 13, 13, 13, 12, 12,
12,
- 12, 11, 11, 11, 10, 10, 10, 9, 9, 9, 9, 8, 8,
- 8, 7, 7, 7, 6, 6, 6, 6, 5, 5, 5, 4, 4, 4, 3,
3,
- 3, 3, 2, 2, 2, 1, 1, 1, 0, 0, 0]
-
-LONG_SIZE = 64
-
-
-def compare_to(x, y):
- """return the sign bit of x-y"""
- if x < y:
- return 1
- return 0
-
-
-def get_log10_round_to_floor(element):
- number_of_leading_zeros = LONG_SIZE - element.bit_length()
- y = MAX_LONG_10_FOR_LEADING_ZEROS[number_of_leading_zeros]
- return y - compare_to(element, math.pow(10, y))
+class DataflowDistributionCounterFn(AccumulatorCombineFn):
+ """A subclass of cy_combiners.AccumulatorCombineFn.
+ Make DataflowDistributionCounter able to report to Dataflow service via
+ CounterFactory.
-class DistributionAccumulator(object):
- """Distribution Counter:
- contains value distribution statistics and methods for incrementing
+ When cythonized DataflowDistributinoCounter available, make
+ CounterFn combine with cythonized module, otherwise, combine with python
+ version.
"""
- def __init__(self):
- global INT64_MAX # pylint: disable=global-variable-not-assigned
- self.min = INT64_MAX
- self.max = 0
- self.count = 0
- self.sum = 0
- """Histogram buckets of value counts for a distribution(1,2,5 bucketing)"""
- self.buckets = []
- """Starting index of the first stored bucket"""
- self.first_bucket_offset = 0
- """There are 3 buckets for every power of ten: 1, 2, 5"""
- self.buckets_per_10 = 3
-
- def add_input(self, element):
- if element < 0:
- raise ValueError('Distribution counters support only non-negative value')
- self.min = min(self.min, element)
- self.max = max(self.max, element)
- self.count += 1
- self.sum += element
- bucket_index = self.calculate_bucket_index(element)
- size_of_bucket = len(self.buckets)
- self.increment_bucket(bucket_index)
- if size_of_bucket == 0:
- self.first_bucket_offset = bucket_index
- else:
- self.first_bucket_offset = min(self.first_bucket_offset, bucket_index)
-
- def calculate_bucket_index(self, element):
- """Calculate the bucket index for the given element"""
- if element == 0:
- return 0
- log10_floor = get_log10_round_to_floor(element)
- power_of_ten = math.pow(10, log10_floor)
- if element < 2 * power_of_ten:
- bucket_offset = 0 # [0, 2)
- elif element < 5 * power_of_ten:
- bucket_offset = 1 # [2, 5)
- else:
- bucket_offset = 2 # [5, 10)
- return 1 + (log10_floor * self.buckets_per_10) + bucket_offset
-
- def increment_bucket(self, bucket_index):
- """Increment the bucket for the given index
- If the bucket at the given index is already in the list,
- this will increment the existing value.
- If the specified index is outside of the current bucket range,
- the bucket list will be extended to incorporate the new bucket
- """
- if not self.buckets:
- self.buckets.append(1)
- elif bucket_index < self.first_bucket_offset:
- new_buckets = []
- new_buckets.append(1)
- new_buckets.extend(
- [0] * (self.first_bucket_offset - bucket_index - 1))
- self.buckets = new_buckets + self.buckets
- elif bucket_index >= self.first_bucket_offset + len(self.buckets):
- self.buckets.extend(
- [0] * (bucket_index - self.first_bucket_offset - len(self.buckets)))
- self.buckets.append(1)
- else:
- self.buckets[bucket_index - self.first_bucket_offset] += 1
-
-
-class DistributionCounterFn(AccumulatorCombineFn):
- _accumulator_type = DistributionAccumulator
+ _accumulator_type = DataflowDistributionCounter
diff --git
a/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd
b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd
new file mode 100644
index 0000000..d4711bd
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# cython: profile=True
+
+""" For internal use only. No backwards compatibility guarantees."""
+
+cimport cython
+from libc.stdint cimport int64_t
+
+
+# 3 buckets for every power of ten -> 1, 2, 5
+cdef enum:
+ BUCKET_PER_TEN = 3
+
+# Assume the max input is max(int64_t), then the possible max bucket size is 59
+cdef enum:
+ MAX_BUCKET_SIZE = 59
+
+cdef class DataflowDistributionCounter(object):
+ cdef public int64_t min
+ cdef public int64_t max
+ cdef public int64_t count
+ cdef public int64_t sum
+ cdef int64_t* buckets
+ cdef public bint is_cythonized
+ cpdef bint add_input(self, int64_t element) except -1
+ cdef int64_t _fast_calculate_bucket_index(self, int64_t element)
+ cpdef void translate_to_histogram(self, histogram)
+ cpdef bint add_inputs_for_test(self, elements) except -1
+ cpdef int64_t calculate_bucket_index(self, int64_t element)
diff --git
a/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx
b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx
new file mode 100644
index 0000000..a3622aa
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# cython: profile=True
+
+""" For internal use only. No backwards compatibility guarantees."""
+
+cimport cython
+from libc.stdint cimport int64_t, INT64_MAX
+from libc.stdlib cimport calloc, free
+
+
+cdef unsigned long long* POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4,
10e5,
+ 10e6, 10e7, 10e8, 10e9, 10e10, 10e11,
+ 10e12, 10e13, 10e14, 10e15, 10e16, 10e17,
+ 10e18]
+
+
+cdef int64_t get_log10_round_to_floor(int64_t element):
+ cdef int power = 0
+ while element >= POWER_TEN[power]:
+ power += 1
+ return power - 1
+
+
+cdef class DataflowDistributionCounter(object):
+ """Distribution Counter:
+
+ Contains value distribution statistics and methods for incrementing.
+
+ Currently using special bucketing strategy suitable for Dataflow
+
+ Attributes:
+ min: minimum value of all inputs.
+ max: maximum value of all inputs.
+ count: total count of all inputs.
+ sum: sum of all inputs.
+ buckets: histogram buckets of value counts for a
+ distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as
input).
+ is_cythonized: mark whether DataflowDistributionCounter cythonized.
+ """
+ def __init__(self):
+ self.min = INT64_MAX
+ self.max = 0
+ self.count = 0
+ self.sum = 0
+ self.buckets = <int64_t*> calloc(MAX_BUCKET_SIZE, sizeof(int64_t))
+ self.is_cythonized = True
+
+ def __dealloc__(self):
+ """free allocated memory"""
+ free(self.buckets)
+
+ cpdef bint add_input(self, int64_t element) except -1:
+ if element < 0:
+ raise ValueError('Distribution counters support only non-negative value')
+ self.min = min(self.min, element)
+ self.max = max(self.max, element)
+ self.count += 1
+ self.sum += element
+ cdef int64_t bucket_index = self._fast_calculate_bucket_index(element)
+ self.buckets[bucket_index] += 1
+
+ cdef int64_t _fast_calculate_bucket_index(self, int64_t element):
+ """Calculate the bucket index for the given element.
+
+ Declare calculate_bucket_index as cdef in order to improve performance,
+ since cpdef will have significant overhead.
+ """
+ if element == 0:
+ return 0
+ cdef int64_t log10_floor = get_log10_round_to_floor(element)
+ cdef int64_t power_of_ten = POWER_TEN[log10_floor]
+ cdef int64_t bucket_offset = 0
+ if element < power_of_ten * 2:
+ bucket_offset = 0
+ elif element < power_of_ten * 5:
+ bucket_offset = 1
+ else:
+ bucket_offset = 2
+ return 1 + log10_floor * BUCKET_PER_TEN + bucket_offset
+
+ cpdef void translate_to_histogram(self, histogram):
+ """Translate buckets into Histogram.
+
+ Args:
+ histogram:
apache_beam.runners.dataflow.internal.clents.dataflow.Histogram
+ Ideally, only call this function when reporting counter to
+ dataflow service.
+ """
+ cdef int first_bucket_offset = 0
+ cdef int last_bucket_offset = 0
+ cdef int index = 0
+ for index in range(0, MAX_BUCKET_SIZE):
+ if self.buckets[index] != 0:
+ first_bucket_offset = index
+ break
+ for index in range(MAX_BUCKET_SIZE - 1, -1, -1):
+ if self.buckets[index] != 0:
+ last_bucket_offset = index
+ break
+ histogram.firstBucketOffset = first_bucket_offset
+ histogram.bucketCounts = []
+ for index in range(first_bucket_offset, last_bucket_offset + 1):
+ histogram.bucketCounts.append(self.buckets[index])
+
+ cpdef bint add_inputs_for_test(self, elements) except -1:
+ """Used for performance microbenchmark.
+
+ During runtime, add_input will be called through c-call, so we want to have
+ the same calling routine when running microbenchmark as application
runtime.
+ Directly calling cpdef from def will cause significant overhead.
+ """
+ for element in elements:
+ self.add_input(element)
+
+ cpdef int64_t calculate_bucket_index(self, int64_t element):
+ """Used for unit tests.
+
+ cdef calculate_bucket_index cannot be called directly from def.
+ """
+ return self._fast_calculate_bucket_index(element)
diff --git a/sdks/python/apache_beam/transforms/cy_combiners_test.py
b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
similarity index 50%
rename from sdks/python/apache_beam/transforms/cy_combiners_test.py
rename to
sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
index f9ccfb0..e19eee6 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners_test.py
+++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py
@@ -9,24 +9,28 @@
# limitations under the License.
#
-'''
-Unit tests for the Distribution Counter
-'''
+"""Unit tests for DataflowDistributionCounter
+When Cython is available, unit tests will test on cythonized module,
+otherwise, test on pure python module
+"""
+
import math
+import sys
import unittest
-from apache_beam.transforms.cy_combiners import DistributionAccumulator
+from mock import Mock
+from apache_beam.transforms import DataflowDistributionCounter
-class DistributionAccumulatorTest(unittest.TestCase):
+class DataflowDistributionAccumulatorTest(unittest.TestCase):
def test_calculate_bucket_index_with_input_0(self):
- counter = DistributionAccumulator()
+ counter = DataflowDistributionCounter()
index = counter.calculate_bucket_index(0)
self.assertEquals(index, 0)
def test_calculate_bucket_index_within_max_long(self):
- counter = DistributionAccumulator()
+ counter = DataflowDistributionCounter()
bucket = 1
power_of_ten = 1
INT64_MAX = math.pow(2, 63) - 1
@@ -39,26 +43,39 @@ class DistributionAccumulatorTest(unittest.TestCase):
power_of_ten *= 10
def test_add_input(self):
- counter = DistributionAccumulator()
+ counter = DataflowDistributionCounter()
expected_buckets = [1, 3, 0, 0, 0, 0, 0, 0, 1, 1]
expected_sum = 1510
expected_first_bucket_index = 1
expected_count = 6
expected_min = 1
expected_max = 1000
- for value in [1, 500, 2, 3, 1000, 4]:
- counter.add_input(value)
- self.assertEquals(counter.buckets, expected_buckets)
+ for element in [1, 500, 2, 3, 1000, 4]:
+ counter.add_input(element)
+ histogram = Mock(firstBucketOffset=None, bucketCounts=None)
+ counter.translate_to_histogram(histogram)
self.assertEquals(counter.sum, expected_sum)
- self.assertEquals(counter.first_bucket_offset, expected_first_bucket_index)
self.assertEquals(counter.count, expected_count)
self.assertEquals(counter.min, expected_min)
self.assertEquals(counter.max, expected_max)
+ self.assertEquals(histogram.firstBucketOffset, expected_first_bucket_index)
+ self.assertEquals(histogram.bucketCounts, expected_buckets)
+
+ def test_translate_to_histogram_with_input_0(self):
+ counter = DataflowDistributionCounter()
+ counter.add_input(0)
+ histogram = Mock(firstBucketOffset=None, bucketCounts=None)
+ counter.translate_to_histogram(histogram)
+ self.assertEquals(histogram.firstBucketOffset, 0)
+ self.assertEquals(histogram.bucketCounts, [1])
- def test_add_input_with_invalid_input(self):
- counter = DistributionAccumulator()
- with self.assertRaises(ValueError):
- counter.add_input(-1)
+ def test_translate_to_histogram_with_max_input(self):
+ counter = DataflowDistributionCounter()
+ counter.add_input(sys.maxint)
+ histogram = Mock(firstBucketOffset=None, bucketCounts=None)
+ counter.translate_to_histogram(histogram)
+ self.assertEquals(histogram.firstBucketOffset, 57)
+ self.assertEquals(histogram.bucketCounts, [1])
if __name__ == '__main__':
diff --git
a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
new file mode 100644
index 0000000..ee5a099
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""For internal use only; no backwards-compatibility guarantees."""
+
+
+globals()['INT64_MAX'] = 2**63 - 1
+globals()['INT64_MIN'] = -2**63
+
+POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 10e5,
+ 10e6, 10e7, 10e8, 10e9, 10e10, 10e11,
+ 10e12, 10e13, 10e14, 10e15, 10e16, 10e17,
+ 10e18]
+
+
+def get_log10_round_to_floor(element):
+ power = 0
+ while element >= POWER_TEN[power]:
+ power += 1
+ return power - 1
+
+
+class DataflowDistributionCounter(object):
+ """Pure python DataflowDistributionCounter in case Cython not available.
+
+
+ Please avoid using python mode if possible, since it's super slow
+ Cythonized DatadflowDistributionCounter defined in
+ apache_beam.transforms.cy_dataflow_distribution_counter.
+
+ Currently using special bucketing strategy suitable for Dataflow
+
+ Attributes:
+ min: minimum value of all inputs.
+ max: maximum value of all inputs.
+ count: total count of all inputs.
+ sum: sum of all inputs.
+ buckets: histogram buckets of value counts for a
+ distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as
input).
+ is_cythonized: mark whether DataflowDistributionCounter cythonized.
+ """
+ # Assume the max input is sys.maxint, then the possible max bucket size is 59
+ MAX_BUCKET_SIZE = 59
+
+ # 3 buckets for every power of ten -> 1, 2, 5
+ BUCKET_PER_TEN = 3
+
+ def __init__(self):
+ self.min = INT64_MAX
+ self.max = 0
+ self.count = 0
+ self.sum = 0
+ self.buckets = [0] * self.MAX_BUCKET_SIZE
+ self.is_cythonized = False
+
+ def add_input(self, element):
+ if element < 0:
+ raise ValueError('Distribution counters support only non-negative value')
+ self.min = min(self.min, element)
+ self.max = max(self.max, element)
+ self.count += 1
+ self.sum += element
+ bucket_index = self.calculate_bucket_index(element)
+ self.buckets[bucket_index] += 1
+
+ def calculate_bucket_index(self, element):
+ """Calculate the bucket index for the given element."""
+ if element == 0:
+ return 0
+ log10_floor = get_log10_round_to_floor(element)
+ power_of_ten = POWER_TEN[log10_floor]
+ if element < power_of_ten * 2:
+ bucket_offset = 0
+ elif element < power_of_ten * 5:
+ bucket_offset = 1
+ else:
+ bucket_offset = 2
+ return 1 + log10_floor * self.BUCKET_PER_TEN + bucket_offset
+
+ def translate_to_histogram(self, histogram):
+ """Translate buckets into Histogram.
+
+ Args:
+ histogram:
apache_beam.runners.dataflow.internal.clents.dataflow.Histogram
+ Ideally, only call this function when reporting counter to
+ dataflow service.
+ """
+ first_bucket_offset = 0
+ last_bucket_offset = 0
+ for index in range(0, self.MAX_BUCKET_SIZE):
+ if self.buckets[index] != 0:
+ first_bucket_offset = index
+ break
+ for index in range(self.MAX_BUCKET_SIZE - 1, -1, -1):
+ if self.buckets[index] != 0:
+ last_bucket_offset = index
+ break
+ histogram.firstBucketOffset = first_bucket_offset
+ histogram.bucketCounts = (
+ self.buckets[first_bucket_offset:last_bucket_offset + 1])
diff --git a/sdks/python/apache_beam/utils/counters.py
b/sdks/python/apache_beam/utils/counters.py
index 2475ef1..46ac8ff 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -131,8 +131,9 @@ class Counter(object):
SUM = cy_combiners.SumInt64Fn()
MEAN = cy_combiners.MeanInt64Fn()
- # Distribution Accumulator Fn
- DISTRIBUTION = cy_combiners.DistributionCounterFn()
+ # Dataflow Distribution Accumulator Fn.
+ # TODO(BEAM-4045): Generalize distribution counter if necessary.
+ DATAFLOW_DISTRIBUTION = cy_combiners.DataflowDistributionCounterFn()
def __init__(self, name, combine_fn):
"""Creates a Counter object.
diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh
index 54795e2..ae8d043 100755
--- a/sdks/python/generate_pydoc.sh
+++ b/sdks/python/generate_pydoc.sh
@@ -54,6 +54,8 @@ excluded_patterns=(
apache_beam/runners/worker/
apache_beam/tools/map_fn_microbenchmark.*
apache_beam/transforms/cy_combiners.*
+ apache_beam/transforms/cy_dataflow_distribution_counter.*
+ apache_beam/transforms/py_dataflow_distribution_counter.*
apache_beam/utils/counters.*
apache_beam/utils/windowed_value.*
*_pb2.py
--
To stop receiving notification emails like this one, please contact
[email protected].