This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c4d2d7d Cythonize DistributionAccumulator new 9e3e9c4 This closes #5028 c4d2d7d is described below commit c4d2d7d717767b803864d788396e4d3dbd419cff Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Wed Apr 4 17:36:04 2018 -0700 Cythonize DistributionAccumulator --- .../runners/dataflow/internal/apiclient.py | 19 ++- .../runners/dataflow/internal/apiclient_test.py | 24 +++- .../tools/distribution_counter_microbenchmark.py | 12 +- sdks/python/apache_beam/transforms/__init__.py | 6 + .../python/apache_beam/transforms/cy_combiners.pxd | 20 --- sdks/python/apache_beam/transforms/cy_combiners.py | 105 ++-------------- .../cy_dataflow_distribution_counter.pxd | 44 +++++++ .../cy_dataflow_distribution_counter.pyx | 135 +++++++++++++++++++++ ...st.py => dataflow_distribution_counter_test.py} | 49 +++++--- .../transforms/py_dataflow_distribution_counter.py | 114 +++++++++++++++++ sdks/python/apache_beam/utils/counters.py | 5 +- sdks/python/generate_pydoc.sh | 2 + 12 files changed, 397 insertions(+), 138 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 9db1cab..54eba06 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -47,6 +47,7 @@ from apache_beam.runners.dataflow.internal.clients import dataflow from apache_beam.runners.dataflow.internal.dependency import get_sdk_name_and_version from apache_beam.runners.dataflow.internal.names import PropertyNames from apache_beam.transforms import cy_combiners +from apache_beam.transforms import DataflowDistributionCounter from apache_beam.transforms.display import DisplayData from apache_beam.utils import retry @@ -738,12 +739,22 @@ def to_split_int(n): def translate_distribution(distribution_update, metric_update_proto): - """Translate metrics DistributionUpdate to dataflow distribution update.""" + """Translate metrics DistributionUpdate to dataflow distribution update. + + Args: + distribution_update: Instance of DistributionData or + DataflowDistributionCounter. + metric_update_proto: Used for report metrics. + """ dist_update_proto = dataflow.DistributionUpdate() dist_update_proto.min = to_split_int(distribution_update.min) dist_update_proto.max = to_split_int(distribution_update.max) dist_update_proto.count = to_split_int(distribution_update.count) dist_update_proto.sum = to_split_int(distribution_update.sum) + # DatadflowDistributionCounter needs to translate histogram + if isinstance(distribution_update, DataflowDistributionCounter): + dist_update_proto.histogram = dataflow.Histogram() + distribution_update.translate_to_histogram(dist_update_proto.histogram) metric_update_proto.distribution = dist_update_proto @@ -804,6 +815,9 @@ structured_counter_translations = { cy_combiners.AnyCombineFn: ( dataflow.CounterMetadata.KindValueValuesEnum.OR, MetricUpdateTranslators.translate_boolean), + cy_combiners.DataflowDistributionCounterFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION, + translate_distribution) } @@ -841,4 +855,7 @@ counter_translations = { cy_combiners.AnyCombineFn: ( dataflow.NameAndKind.KindValueValuesEnum.OR, MetricUpdateTranslators.translate_boolean), + cy_combiners.DataflowDistributionCounterFn: ( + dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION, + translate_distribution) } diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index fc5c21c..35ece3e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -23,9 +23,10 @@ from apache_beam.metrics.cells import DistributionData from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.dataflow.internal import dependency from apache_beam.runners.dataflow.internal.clients import dataflow +from apache_beam.transforms import DataflowDistributionCounter # Protect against environments where apitools library is not available. -# pylint: disable=wrong-import-order, wrong-import-position +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: from apache_beam.runners.dataflow.internal import apiclient except ImportError: @@ -133,6 +134,27 @@ class UtilTest(unittest.TestCase): self.assertEqual(metric_update.distribution.count.lowBits, distribution_update.count) + def test_translate_distribution_counter(self): + counter_update = DataflowDistributionCounter() + counter_update.add_input(1) + counter_update.add_input(3) + metric_proto = dataflow.CounterUpdate() + apiclient.translate_distribution(counter_update, metric_proto) + histogram = mock.Mock(firstBucketOffset=None, bucketCounts=None) + counter_update.translate_to_histogram(histogram) + self.assertEqual(metric_proto.distribution.min.lowBits, + counter_update.min) + self.assertEqual(metric_proto.distribution.max.lowBits, + counter_update.max) + self.assertEqual(metric_proto.distribution.sum.lowBits, + counter_update.sum) + self.assertEqual(metric_proto.distribution.count.lowBits, + counter_update.count) + self.assertEqual(metric_proto.distribution.histogram.bucketCounts, + histogram.bucketCounts) + self.assertEqual(metric_proto.distribution.histogram.firstBucketOffset, + histogram.firstBucketOffset) + def test_translate_means(self): metric_update = dataflow.CounterUpdate() accumulator = mock.Mock() diff --git a/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py b/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py index 2426bc5..3e301eb 100644 --- a/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py +++ b/sdks/python/apache_beam/tools/distribution_counter_microbenchmark.py @@ -30,7 +30,6 @@ import sys import time from apache_beam.tools import utils -from apache_beam.transforms.cy_combiners import DistributionAccumulator def generate_input_values(num_input, lower_bound, upper_bound): @@ -47,15 +46,15 @@ def run_benchmark(num_runs=100, num_input=10000, seed=time.time()): lower_bound = 0 upper_bound = sys.maxint inputs = generate_input_values(num_input, lower_bound, upper_bound) - print ("Number of runs:", num_runs) + from apache_beam.transforms import DataflowDistributionCounter + print("Number of runs:", num_runs) print("Input size:", num_input) print("Input sequence from %d to %d" % (lower_bound, upper_bound)) print("Random seed:", seed) for i in range(num_runs): - counter = DistributionAccumulator() + counter = DataflowDistributionCounter() start = time.time() - for value in inputs: - counter.add_input(value) + counter.add_inputs_for_test(inputs) time_cost = time.time() - start print("Run %d: Total time cost %g sec" % (i+1, time_cost)) total_time += time_cost/num_input @@ -63,5 +62,6 @@ def run_benchmark(num_runs=100, num_input=10000, seed=time.time()): if __name__ == '__main__': - utils.check_compiled('apache_beam.transforms.cy_combiners') + utils.check_compiled( + 'apache_beam.transforms.cy_dataflow_distribution_counter') run_benchmark() diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index b77b0f6..3c04b37 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -23,3 +23,9 @@ from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.timeutil import TimeDomain from apache_beam.transforms.util import * + +# No backwards compatibility guarantees. +try: + from apache_beam.transforms.cy_dataflow_distribution_counter import DataflowDistributionCounter +except ImportError: + from apache_beam.transforms.py_dataflow_distribution_counter import DataflowDistributionCounter diff --git a/sdks/python/apache_beam/transforms/cy_combiners.pxd b/sdks/python/apache_beam/transforms/cy_combiners.pxd index 0e6fe0c..4fc03a7 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.pxd +++ b/sdks/python/apache_beam/transforms/cy_combiners.pxd @@ -90,23 +90,3 @@ cdef class AnyAccumulator(object): cpdef add_input(self, bint element) @cython.locals(accumulator=AnyAccumulator) cpdef merge(self, accumulators) - -cdef bint compare_to(int64_t x, int64_t y) - -@cython.locals(number_of_leading_zeros=int64_t, y=int64_t) -cdef int64_t get_log10_round_to_floor(int64_t element) - -cdef class DistributionAccumulator(object): - cdef public int64_t min - cdef public int64_t max - cdef public int64_t count - cdef public int64_t sum - cdef public int64_t first_bucket_offset - cdef public list buckets - cdef public int64_t buckets_per_10 - @cython.locals(bucket_index = int64_t, size_of_bucket=int64_t) - cpdef add_input(self, int64_t element) - @cython.locals(log10_floor=int64_t, power_of_ten=int64_t, - bucket_offset=int64_t) - cpdef int64_t calculate_bucket_index(self, int64_t element) - cdef void increment_bucket(self, int64_t bucket_index) diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index b0f7e7b..53a440e 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -22,10 +22,13 @@ For internal use only; no backwards-compatibility guarantees. from __future__ import absolute_import -import math - from apache_beam.transforms import core +try: + from apache_beam.transforms.cy_dataflow_distribution_counter import DataflowDistributionCounter +except ImportError: + from apache_beam.transforms.py_dataflow_distribution_counter import DataflowDistributionCounter + class AccumulatorCombineFn(core.CombineFn): # singleton? @@ -312,96 +315,14 @@ class AllCombineFn(AccumulatorCombineFn): _accumulator_type = AllAccumulator -MAX_LONG_10_FOR_LEADING_ZEROS = [19, 18, 18, 18, 18, 17, 17, 17, 16, 16, 16, 15, - 15, 15, 15, 14, 14, 14, 13, 13, 13, 12, 12, 12, - 12, 11, 11, 11, 10, 10, 10, 9, 9, 9, 9, 8, 8, - 8, 7, 7, 7, 6, 6, 6, 6, 5, 5, 5, 4, 4, 4, 3, 3, - 3, 3, 2, 2, 2, 1, 1, 1, 0, 0, 0] - -LONG_SIZE = 64 - - -def compare_to(x, y): - """return the sign bit of x-y""" - if x < y: - return 1 - return 0 - - -def get_log10_round_to_floor(element): - number_of_leading_zeros = LONG_SIZE - element.bit_length() - y = MAX_LONG_10_FOR_LEADING_ZEROS[number_of_leading_zeros] - return y - compare_to(element, math.pow(10, y)) +class DataflowDistributionCounterFn(AccumulatorCombineFn): + """A subclass of cy_combiners.AccumulatorCombineFn. + Make DataflowDistributionCounter able to report to Dataflow service via + CounterFactory. -class DistributionAccumulator(object): - """Distribution Counter: - contains value distribution statistics and methods for incrementing + When cythonized DataflowDistributinoCounter available, make + CounterFn combine with cythonized module, otherwise, combine with python + version. """ - def __init__(self): - global INT64_MAX # pylint: disable=global-variable-not-assigned - self.min = INT64_MAX - self.max = 0 - self.count = 0 - self.sum = 0 - """Histogram buckets of value counts for a distribution(1,2,5 bucketing)""" - self.buckets = [] - """Starting index of the first stored bucket""" - self.first_bucket_offset = 0 - """There are 3 buckets for every power of ten: 1, 2, 5""" - self.buckets_per_10 = 3 - - def add_input(self, element): - if element < 0: - raise ValueError('Distribution counters support only non-negative value') - self.min = min(self.min, element) - self.max = max(self.max, element) - self.count += 1 - self.sum += element - bucket_index = self.calculate_bucket_index(element) - size_of_bucket = len(self.buckets) - self.increment_bucket(bucket_index) - if size_of_bucket == 0: - self.first_bucket_offset = bucket_index - else: - self.first_bucket_offset = min(self.first_bucket_offset, bucket_index) - - def calculate_bucket_index(self, element): - """Calculate the bucket index for the given element""" - if element == 0: - return 0 - log10_floor = get_log10_round_to_floor(element) - power_of_ten = math.pow(10, log10_floor) - if element < 2 * power_of_ten: - bucket_offset = 0 # [0, 2) - elif element < 5 * power_of_ten: - bucket_offset = 1 # [2, 5) - else: - bucket_offset = 2 # [5, 10) - return 1 + (log10_floor * self.buckets_per_10) + bucket_offset - - def increment_bucket(self, bucket_index): - """Increment the bucket for the given index - If the bucket at the given index is already in the list, - this will increment the existing value. - If the specified index is outside of the current bucket range, - the bucket list will be extended to incorporate the new bucket - """ - if not self.buckets: - self.buckets.append(1) - elif bucket_index < self.first_bucket_offset: - new_buckets = [] - new_buckets.append(1) - new_buckets.extend( - [0] * (self.first_bucket_offset - bucket_index - 1)) - self.buckets = new_buckets + self.buckets - elif bucket_index >= self.first_bucket_offset + len(self.buckets): - self.buckets.extend( - [0] * (bucket_index - self.first_bucket_offset - len(self.buckets))) - self.buckets.append(1) - else: - self.buckets[bucket_index - self.first_bucket_offset] += 1 - - -class DistributionCounterFn(AccumulatorCombineFn): - _accumulator_type = DistributionAccumulator + _accumulator_type = DataflowDistributionCounter diff --git a/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd new file mode 100644 index 0000000..d4711bd --- /dev/null +++ b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pxd @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# cython: profile=True + +""" For internal use only. No backwards compatibility guarantees.""" + +cimport cython +from libc.stdint cimport int64_t + + +# 3 buckets for every power of ten -> 1, 2, 5 +cdef enum: + BUCKET_PER_TEN = 3 + +# Assume the max input is max(int64_t), then the possible max bucket size is 59 +cdef enum: + MAX_BUCKET_SIZE = 59 + +cdef class DataflowDistributionCounter(object): + cdef public int64_t min + cdef public int64_t max + cdef public int64_t count + cdef public int64_t sum + cdef int64_t* buckets + cdef public bint is_cythonized + cpdef bint add_input(self, int64_t element) except -1 + cdef int64_t _fast_calculate_bucket_index(self, int64_t element) + cpdef void translate_to_histogram(self, histogram) + cpdef bint add_inputs_for_test(self, elements) except -1 + cpdef int64_t calculate_bucket_index(self, int64_t element) diff --git a/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx new file mode 100644 index 0000000..a3622aa --- /dev/null +++ b/sdks/python/apache_beam/transforms/cy_dataflow_distribution_counter.pyx @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# cython: profile=True + +""" For internal use only. No backwards compatibility guarantees.""" + +cimport cython +from libc.stdint cimport int64_t, INT64_MAX +from libc.stdlib cimport calloc, free + + +cdef unsigned long long* POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 10e5, + 10e6, 10e7, 10e8, 10e9, 10e10, 10e11, + 10e12, 10e13, 10e14, 10e15, 10e16, 10e17, + 10e18] + + +cdef int64_t get_log10_round_to_floor(int64_t element): + cdef int power = 0 + while element >= POWER_TEN[power]: + power += 1 + return power - 1 + + +cdef class DataflowDistributionCounter(object): + """Distribution Counter: + + Contains value distribution statistics and methods for incrementing. + + Currently using special bucketing strategy suitable for Dataflow + + Attributes: + min: minimum value of all inputs. + max: maximum value of all inputs. + count: total count of all inputs. + sum: sum of all inputs. + buckets: histogram buckets of value counts for a + distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as input). + is_cythonized: mark whether DataflowDistributionCounter cythonized. + """ + def __init__(self): + self.min = INT64_MAX + self.max = 0 + self.count = 0 + self.sum = 0 + self.buckets = <int64_t*> calloc(MAX_BUCKET_SIZE, sizeof(int64_t)) + self.is_cythonized = True + + def __dealloc__(self): + """free allocated memory""" + free(self.buckets) + + cpdef bint add_input(self, int64_t element) except -1: + if element < 0: + raise ValueError('Distribution counters support only non-negative value') + self.min = min(self.min, element) + self.max = max(self.max, element) + self.count += 1 + self.sum += element + cdef int64_t bucket_index = self._fast_calculate_bucket_index(element) + self.buckets[bucket_index] += 1 + + cdef int64_t _fast_calculate_bucket_index(self, int64_t element): + """Calculate the bucket index for the given element. + + Declare calculate_bucket_index as cdef in order to improve performance, + since cpdef will have significant overhead. + """ + if element == 0: + return 0 + cdef int64_t log10_floor = get_log10_round_to_floor(element) + cdef int64_t power_of_ten = POWER_TEN[log10_floor] + cdef int64_t bucket_offset = 0 + if element < power_of_ten * 2: + bucket_offset = 0 + elif element < power_of_ten * 5: + bucket_offset = 1 + else: + bucket_offset = 2 + return 1 + log10_floor * BUCKET_PER_TEN + bucket_offset + + cpdef void translate_to_histogram(self, histogram): + """Translate buckets into Histogram. + + Args: + histogram: apache_beam.runners.dataflow.internal.clents.dataflow.Histogram + Ideally, only call this function when reporting counter to + dataflow service. + """ + cdef int first_bucket_offset = 0 + cdef int last_bucket_offset = 0 + cdef int index = 0 + for index in range(0, MAX_BUCKET_SIZE): + if self.buckets[index] != 0: + first_bucket_offset = index + break + for index in range(MAX_BUCKET_SIZE - 1, -1, -1): + if self.buckets[index] != 0: + last_bucket_offset = index + break + histogram.firstBucketOffset = first_bucket_offset + histogram.bucketCounts = [] + for index in range(first_bucket_offset, last_bucket_offset + 1): + histogram.bucketCounts.append(self.buckets[index]) + + cpdef bint add_inputs_for_test(self, elements) except -1: + """Used for performance microbenchmark. + + During runtime, add_input will be called through c-call, so we want to have + the same calling routine when running microbenchmark as application runtime. + Directly calling cpdef from def will cause significant overhead. + """ + for element in elements: + self.add_input(element) + + cpdef int64_t calculate_bucket_index(self, int64_t element): + """Used for unit tests. + + cdef calculate_bucket_index cannot be called directly from def. + """ + return self._fast_calculate_bucket_index(element) diff --git a/sdks/python/apache_beam/transforms/cy_combiners_test.py b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py similarity index 50% rename from sdks/python/apache_beam/transforms/cy_combiners_test.py rename to sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py index f9ccfb0..e19eee6 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners_test.py +++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py @@ -9,24 +9,28 @@ # limitations under the License. # -''' -Unit tests for the Distribution Counter -''' +"""Unit tests for DataflowDistributionCounter +When Cython is available, unit tests will test on cythonized module, +otherwise, test on pure python module +""" + import math +import sys import unittest -from apache_beam.transforms.cy_combiners import DistributionAccumulator +from mock import Mock +from apache_beam.transforms import DataflowDistributionCounter -class DistributionAccumulatorTest(unittest.TestCase): +class DataflowDistributionAccumulatorTest(unittest.TestCase): def test_calculate_bucket_index_with_input_0(self): - counter = DistributionAccumulator() + counter = DataflowDistributionCounter() index = counter.calculate_bucket_index(0) self.assertEquals(index, 0) def test_calculate_bucket_index_within_max_long(self): - counter = DistributionAccumulator() + counter = DataflowDistributionCounter() bucket = 1 power_of_ten = 1 INT64_MAX = math.pow(2, 63) - 1 @@ -39,26 +43,39 @@ class DistributionAccumulatorTest(unittest.TestCase): power_of_ten *= 10 def test_add_input(self): - counter = DistributionAccumulator() + counter = DataflowDistributionCounter() expected_buckets = [1, 3, 0, 0, 0, 0, 0, 0, 1, 1] expected_sum = 1510 expected_first_bucket_index = 1 expected_count = 6 expected_min = 1 expected_max = 1000 - for value in [1, 500, 2, 3, 1000, 4]: - counter.add_input(value) - self.assertEquals(counter.buckets, expected_buckets) + for element in [1, 500, 2, 3, 1000, 4]: + counter.add_input(element) + histogram = Mock(firstBucketOffset=None, bucketCounts=None) + counter.translate_to_histogram(histogram) self.assertEquals(counter.sum, expected_sum) - self.assertEquals(counter.first_bucket_offset, expected_first_bucket_index) self.assertEquals(counter.count, expected_count) self.assertEquals(counter.min, expected_min) self.assertEquals(counter.max, expected_max) + self.assertEquals(histogram.firstBucketOffset, expected_first_bucket_index) + self.assertEquals(histogram.bucketCounts, expected_buckets) + + def test_translate_to_histogram_with_input_0(self): + counter = DataflowDistributionCounter() + counter.add_input(0) + histogram = Mock(firstBucketOffset=None, bucketCounts=None) + counter.translate_to_histogram(histogram) + self.assertEquals(histogram.firstBucketOffset, 0) + self.assertEquals(histogram.bucketCounts, [1]) - def test_add_input_with_invalid_input(self): - counter = DistributionAccumulator() - with self.assertRaises(ValueError): - counter.add_input(-1) + def test_translate_to_histogram_with_max_input(self): + counter = DataflowDistributionCounter() + counter.add_input(sys.maxint) + histogram = Mock(firstBucketOffset=None, bucketCounts=None) + counter.translate_to_histogram(histogram) + self.assertEquals(histogram.firstBucketOffset, 57) + self.assertEquals(histogram.bucketCounts, [1]) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py new file mode 100644 index 0000000..ee5a099 --- /dev/null +++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""For internal use only; no backwards-compatibility guarantees.""" + + +globals()['INT64_MAX'] = 2**63 - 1 +globals()['INT64_MIN'] = -2**63 + +POWER_TEN = [10e-1, 10e0, 10e1, 10e2, 10e3, 10e4, 10e5, + 10e6, 10e7, 10e8, 10e9, 10e10, 10e11, + 10e12, 10e13, 10e14, 10e15, 10e16, 10e17, + 10e18] + + +def get_log10_round_to_floor(element): + power = 0 + while element >= POWER_TEN[power]: + power += 1 + return power - 1 + + +class DataflowDistributionCounter(object): + """Pure python DataflowDistributionCounter in case Cython not available. + + + Please avoid using python mode if possible, since it's super slow + Cythonized DatadflowDistributionCounter defined in + apache_beam.transforms.cy_dataflow_distribution_counter. + + Currently using special bucketing strategy suitable for Dataflow + + Attributes: + min: minimum value of all inputs. + max: maximum value of all inputs. + count: total count of all inputs. + sum: sum of all inputs. + buckets: histogram buckets of value counts for a + distribution(1,2,5 bucketing). Max bucket_index is 58( sys.maxint as input). + is_cythonized: mark whether DataflowDistributionCounter cythonized. + """ + # Assume the max input is sys.maxint, then the possible max bucket size is 59 + MAX_BUCKET_SIZE = 59 + + # 3 buckets for every power of ten -> 1, 2, 5 + BUCKET_PER_TEN = 3 + + def __init__(self): + self.min = INT64_MAX + self.max = 0 + self.count = 0 + self.sum = 0 + self.buckets = [0] * self.MAX_BUCKET_SIZE + self.is_cythonized = False + + def add_input(self, element): + if element < 0: + raise ValueError('Distribution counters support only non-negative value') + self.min = min(self.min, element) + self.max = max(self.max, element) + self.count += 1 + self.sum += element + bucket_index = self.calculate_bucket_index(element) + self.buckets[bucket_index] += 1 + + def calculate_bucket_index(self, element): + """Calculate the bucket index for the given element.""" + if element == 0: + return 0 + log10_floor = get_log10_round_to_floor(element) + power_of_ten = POWER_TEN[log10_floor] + if element < power_of_ten * 2: + bucket_offset = 0 + elif element < power_of_ten * 5: + bucket_offset = 1 + else: + bucket_offset = 2 + return 1 + log10_floor * self.BUCKET_PER_TEN + bucket_offset + + def translate_to_histogram(self, histogram): + """Translate buckets into Histogram. + + Args: + histogram: apache_beam.runners.dataflow.internal.clents.dataflow.Histogram + Ideally, only call this function when reporting counter to + dataflow service. + """ + first_bucket_offset = 0 + last_bucket_offset = 0 + for index in range(0, self.MAX_BUCKET_SIZE): + if self.buckets[index] != 0: + first_bucket_offset = index + break + for index in range(self.MAX_BUCKET_SIZE - 1, -1, -1): + if self.buckets[index] != 0: + last_bucket_offset = index + break + histogram.firstBucketOffset = first_bucket_offset + histogram.bucketCounts = ( + self.buckets[first_bucket_offset:last_bucket_offset + 1]) diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index 2475ef1..46ac8ff 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -131,8 +131,9 @@ class Counter(object): SUM = cy_combiners.SumInt64Fn() MEAN = cy_combiners.MeanInt64Fn() - # Distribution Accumulator Fn - DISTRIBUTION = cy_combiners.DistributionCounterFn() + # Dataflow Distribution Accumulator Fn. + # TODO(BEAM-4045): Generalize distribution counter if necessary. + DATAFLOW_DISTRIBUTION = cy_combiners.DataflowDistributionCounterFn() def __init__(self, name, combine_fn): """Creates a Counter object. diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh index 54795e2..ae8d043 100755 --- a/sdks/python/generate_pydoc.sh +++ b/sdks/python/generate_pydoc.sh @@ -54,6 +54,8 @@ excluded_patterns=( apache_beam/runners/worker/ apache_beam/tools/map_fn_microbenchmark.* apache_beam/transforms/cy_combiners.* + apache_beam/transforms/cy_dataflow_distribution_counter.* + apache_beam/transforms/py_dataflow_distribution_counter.* apache_beam/utils/counters.* apache_beam/utils/windowed_value.* *_pb2.py -- To stop receiving notification emails like this one, please contact al...@apache.org.