This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f876166 [BEAM-4374] Add Beam Distribution Accumulator to use in
python's counter factory.
new 51a13f0 Merge pull request #8272 from
ajamato/mean_byte_count_cy_combiner_only
f876166 is described below
commit f876166ba02a94ea66954a586810f1f15d36d98e
Author: Alex Amato <[email protected]>
AuthorDate: Wed Mar 13 17:56:17 2019 -0700
[BEAM-4374] Add Beam Distribution Accumulator to use in python's counter
factory.
---
.../runners/dataflow/internal/apiclient.py | 21 ++++++++---
.../runners/dataflow/internal/apiclient_test.py | 44 +++++++++++++++++++++-
.../python/apache_beam/transforms/cy_combiners.pxd | 10 +++++
sdks/python/apache_beam/transforms/cy_combiners.py | 36 ++++++++++++++++++
sdks/python/apache_beam/utils/counters.py | 1 +
5 files changed, 105 insertions(+), 7 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index e0a1a56..b0b1325 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -829,8 +829,8 @@ def translate_distribution(distribution_update,
metric_update_proto):
"""Translate metrics DistributionUpdate to dataflow distribution update.
Args:
- distribution_update: Instance of DistributionData or
- DataflowDistributionCounter.
+ distribution_update: Instance of DistributionData,
+ DistributionInt64Accumulator or DataflowDistributionCounter.
metric_update_proto: Used for report metrics.
"""
dist_update_proto = dataflow.DistributionUpdate()
@@ -838,7 +838,7 @@ def translate_distribution(distribution_update,
metric_update_proto):
dist_update_proto.max = to_split_int(distribution_update.max)
dist_update_proto.count = to_split_int(distribution_update.count)
dist_update_proto.sum = to_split_int(distribution_update.sum)
- # DatadflowDistributionCounter needs to translate histogram
+ # DataflowDistributionCounter needs to translate histogram
if isinstance(distribution_update, DataflowDistributionCounter):
dist_update_proto.histogram = dataflow.Histogram()
distribution_update.translate_to_histogram(dist_update_proto.histogram)
@@ -969,6 +969,11 @@ def
_verify_interpreter_version_is_supported(pipeline_options):
# To enable a counter on the service, add it to this dictionary.
+# This is required for the legacy python dataflow runner, as portability
+# does not communicate to the service via python code, but instead via a
+# a runner harness (in C++ or Java).
+# TODO(BEAM-7050) : Remove this antipattern, legacy dataflow python
+# pipelines will break whenever a new cy_combiner type is used.
structured_counter_translations = {
cy_combiners.CountCombineFn: (
dataflow.CounterMetadata.KindValueValuesEnum.SUM,
@@ -1005,7 +1010,10 @@ structured_counter_translations = {
MetricUpdateTranslators.translate_boolean),
cy_combiners.DataflowDistributionCounterFn: (
dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
- translate_distribution)
+ translate_distribution),
+ cy_combiners.DistributionInt64Fn: (
+ dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+ translate_distribution),
}
@@ -1045,5 +1053,8 @@ counter_translations = {
MetricUpdateTranslators.translate_boolean),
cy_combiners.DataflowDistributionCounterFn: (
dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION,
- translate_distribution)
+ translate_distribution),
+ cy_combiners.DistributionInt64Fn: (
+ dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+ translate_distribution),
}
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 77eba7c..2f65716 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -163,7 +163,24 @@ class UtilTest(unittest.TestCase):
self.assertEqual((split_number.lowBits, split_number.highBits),
(0, number))
- def test_translate_distribution(self):
+ def test_translate_distribution_using_accumulator(self):
+ metric_update = dataflow.CounterUpdate()
+ accumulator = mock.Mock()
+ accumulator.min = 1
+ accumulator.max = 15
+ accumulator.sum = 16
+ accumulator.count = 2
+ apiclient.translate_distribution(accumulator, metric_update)
+ self.assertEqual(metric_update.distribution.min.lowBits,
+ accumulator.min)
+ self.assertEqual(metric_update.distribution.max.lowBits,
+ accumulator.max)
+ self.assertEqual(metric_update.distribution.sum.lowBits,
+ accumulator.sum)
+ self.assertEqual(metric_update.distribution.count.lowBits,
+ accumulator.count)
+
+ def test_translate_distribution_using_distribution_data(self):
metric_update = dataflow.CounterUpdate()
distribution_update = DistributionData(16, 2, 1, 15)
apiclient.translate_distribution(distribution_update, metric_update)
@@ -176,7 +193,7 @@ class UtilTest(unittest.TestCase):
self.assertEqual(metric_update.distribution.count.lowBits,
distribution_update.count)
- def test_translate_distribution_counter(self):
+ def test_translate_distribution_using_dataflow_distribution_counter(self):
counter_update = DataflowDistributionCounter()
counter_update.add_input(1)
counter_update.add_input(3)
@@ -215,6 +232,29 @@ class UtilTest(unittest.TestCase):
self.assertEqual(
metric_update.floatingPointMean.count.lowBits, accumulator.count)
+ def test_translate_means_using_distribution_accumulator(self):
+ # This is the special case for MeanByteCount.
+ # Which is reported over the FnAPI as a beam distribution,
+ # and to the service as a MetricUpdate IntegerMean.
+ metric_update = dataflow.CounterUpdate()
+ accumulator = mock.Mock()
+ accumulator.min = 7
+ accumulator.max = 9
+ accumulator.sum = 16
+ accumulator.count = 2
+ apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
+ metric_update)
+ self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum)
+ self.assertEqual(metric_update.integerMean.count.lowBits,
accumulator.count)
+
+ accumulator.sum = 16.0
+ accumulator.count = 2
+ apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator,
+
metric_update)
+ self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum)
+ self.assertEqual(
+ metric_update.floatingPointMean.count.lowBits, accumulator.count)
+
def test_default_ip_configuration(self):
pipeline_options = PipelineOptions(
['--temp_location', 'gs://any-location/temp'])
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.pxd
b/sdks/python/apache_beam/transforms/cy_combiners.pxd
index c0593e3..8834c5a 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.pxd
+++ b/sdks/python/apache_beam/transforms/cy_combiners.pxd
@@ -56,6 +56,16 @@ cdef class MeanInt64Accumulator(object):
cpdef merge(self, accumulators)
+cdef class DistributionInt64Accumulator(object):
+ cdef readonly int64_t sum
+ cdef readonly int64_t count
+ cdef readonly int64_t min
+ cdef readonly int64_t max
+ cpdef add_input(self, int64_t element)
+ @cython.locals(accumulator=DistributionInt64Accumulator)
+ cpdef merge(self, accumulators)
+
+
cdef class SumDoubleAccumulator(object):
cdef readonly double value
cpdef add_input(self, double element)
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py
b/sdks/python/apache_beam/transforms/cy_combiners.py
index e239735..139b8a3 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -175,6 +175,38 @@ class MeanInt64Accumulator(object):
return self.sum // self.count if self.count else _NAN
+class DistributionInt64Accumulator(object):
+ def __init__(self):
+ self.sum = 0
+ self.count = 0
+ self.min = INT64_MAX
+ self.max = INT64_MIN
+
+ def add_input(self, element):
+ element = int(element)
+ if not INT64_MIN <= element <= INT64_MAX:
+ raise OverflowError(element)
+ self.sum += element
+ self.count += 1
+ self.min = min(self.min, element)
+ self.max = max(self.max, element)
+
+ def merge(self, accumulators):
+ for accumulator in accumulators:
+ self.sum += accumulator.sum
+ self.count += accumulator.count
+ self.min = min(self.min, accumulator.min)
+ self.max = max(self.max, accumulator.max)
+
+ def extract_output(self):
+ if not INT64_MIN <= self.sum <= INT64_MAX:
+ self.sum %= 2**64
+ if self.sum >= INT64_MAX:
+ self.sum -= 2**64
+ mean = self.sum // self.count if self.count else _NAN
+ return mean, self.sum, self.count, self.min, self.max
+
+
class CountCombineFn(AccumulatorCombineFn):
_accumulator_type = CountAccumulator
@@ -195,6 +227,10 @@ class MeanInt64Fn(AccumulatorCombineFn):
_accumulator_type = MeanInt64Accumulator
+class DistributionInt64Fn(AccumulatorCombineFn):
+ _accumulator_type = DistributionInt64Accumulator
+
+
_POS_INF = float('inf')
_NEG_INF = float('-inf')
_NAN = float('nan')
diff --git a/sdks/python/apache_beam/utils/counters.py
b/sdks/python/apache_beam/utils/counters.py
index f924853..dcb5683 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -134,6 +134,7 @@ class Counter(object):
# Handy references to common counters.
SUM = cy_combiners.SumInt64Fn()
MEAN = cy_combiners.MeanInt64Fn()
+ BEAM_DISTRIBUTION = cy_combiners.DistributionInt64Fn()
# Dataflow Distribution Accumulator Fn.
# TODO(BEAM-4045): Generalize distribution counter if necessary.