This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f876166  [BEAM-4374] Add Beam Distribution Accumulator to use in 
python's counter factory.
     new 51a13f0  Merge pull request #8272 from 
ajamato/mean_byte_count_cy_combiner_only
f876166 is described below

commit f876166ba02a94ea66954a586810f1f15d36d98e
Author: Alex Amato <ajam...@google.com>
AuthorDate: Wed Mar 13 17:56:17 2019 -0700

    [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter
    factory.
---
 .../runners/dataflow/internal/apiclient.py         | 21 ++++++++---
 .../runners/dataflow/internal/apiclient_test.py    | 44 +++++++++++++++++++++-
 .../python/apache_beam/transforms/cy_combiners.pxd | 10 +++++
 sdks/python/apache_beam/transforms/cy_combiners.py | 36 ++++++++++++++++++
 sdks/python/apache_beam/utils/counters.py          |  1 +
 5 files changed, 105 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index e0a1a56..b0b1325 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -829,8 +829,8 @@ def translate_distribution(distribution_update, 
metric_update_proto):
   """Translate metrics DistributionUpdate to dataflow distribution update.
 
   Args:
-    distribution_update: Instance of DistributionData or
-    DataflowDistributionCounter.
+    distribution_update: Instance of DistributionData,
+    DistributionInt64Accumulator or DataflowDistributionCounter.
     metric_update_proto: Used for report metrics.
   """
   dist_update_proto = dataflow.DistributionUpdate()
@@ -838,7 +838,7 @@ def translate_distribution(distribution_update, 
metric_update_proto):
   dist_update_proto.max = to_split_int(distribution_update.max)
   dist_update_proto.count = to_split_int(distribution_update.count)
   dist_update_proto.sum = to_split_int(distribution_update.sum)
-  # DatadflowDistributionCounter needs to translate histogram
+  # DataflowDistributionCounter needs to translate histogram
   if isinstance(distribution_update, DataflowDistributionCounter):
     dist_update_proto.histogram = dataflow.Histogram()
     distribution_update.translate_to_histogram(dist_update_proto.histogram)
@@ -969,6 +969,11 @@ def 
_verify_interpreter_version_is_supported(pipeline_options):
 
 
 # To enable a counter on the service, add it to this dictionary.
+# This is required for the legacy python dataflow runner, as portability
+# does not communicate to the service via python code, but instead via a
+# a runner harness (in C++ or Java).
+# TODO(BEAM-7050) : Remove this antipattern, legacy dataflow python
+# pipelines will break whenever a new cy_combiner type is used.
 structured_counter_translations = {
     cy_combiners.CountCombineFn: (
         dataflow.CounterMetadata.KindValueValuesEnum.SUM,
@@ -1005,7 +1010,10 @@ structured_counter_translations = {
         MetricUpdateTranslators.translate_boolean),
     cy_combiners.DataflowDistributionCounterFn: (
         dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
-        translate_distribution)
+        translate_distribution),
+    cy_combiners.DistributionInt64Fn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+        translate_distribution),
 }
 
 
@@ -1045,5 +1053,8 @@ counter_translations = {
         MetricUpdateTranslators.translate_boolean),
     cy_combiners.DataflowDistributionCounterFn: (
         dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION,
-        translate_distribution)
+        translate_distribution),
+    cy_combiners.DistributionInt64Fn: (
+        dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION,
+        translate_distribution),
 }
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 77eba7c..2f65716 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -163,7 +163,24 @@ class UtilTest(unittest.TestCase):
     self.assertEqual((split_number.lowBits, split_number.highBits),
                      (0, number))
 
-  def test_translate_distribution(self):
+  def test_translate_distribution_using_accumulator(self):
+    metric_update = dataflow.CounterUpdate()
+    accumulator = mock.Mock()
+    accumulator.min = 1
+    accumulator.max = 15
+    accumulator.sum = 16
+    accumulator.count = 2
+    apiclient.translate_distribution(accumulator, metric_update)
+    self.assertEqual(metric_update.distribution.min.lowBits,
+                     accumulator.min)
+    self.assertEqual(metric_update.distribution.max.lowBits,
+                     accumulator.max)
+    self.assertEqual(metric_update.distribution.sum.lowBits,
+                     accumulator.sum)
+    self.assertEqual(metric_update.distribution.count.lowBits,
+                     accumulator.count)
+
+  def test_translate_distribution_using_distribution_data(self):
     metric_update = dataflow.CounterUpdate()
     distribution_update = DistributionData(16, 2, 1, 15)
     apiclient.translate_distribution(distribution_update, metric_update)
@@ -176,7 +193,7 @@ class UtilTest(unittest.TestCase):
     self.assertEqual(metric_update.distribution.count.lowBits,
                      distribution_update.count)
 
-  def test_translate_distribution_counter(self):
+  def test_translate_distribution_using_dataflow_distribution_counter(self):
     counter_update = DataflowDistributionCounter()
     counter_update.add_input(1)
     counter_update.add_input(3)
@@ -215,6 +232,29 @@ class UtilTest(unittest.TestCase):
     self.assertEqual(
         metric_update.floatingPointMean.count.lowBits, accumulator.count)
 
+  def test_translate_means_using_distribution_accumulator(self):
+    # This is the special case for MeanByteCount.
+    # Which is reported over the FnAPI as a beam distribution,
+    # and to the service as a MetricUpdate IntegerMean.
+    metric_update = dataflow.CounterUpdate()
+    accumulator = mock.Mock()
+    accumulator.min = 7
+    accumulator.max = 9
+    accumulator.sum = 16
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
+                                                                metric_update)
+    self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum)
+    self.assertEqual(metric_update.integerMean.count.lowBits, 
accumulator.count)
+
+    accumulator.sum = 16.0
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator,
+                                                                  
metric_update)
+    self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum)
+    self.assertEqual(
+        metric_update.floatingPointMean.count.lowBits, accumulator.count)
+
   def test_default_ip_configuration(self):
     pipeline_options = PipelineOptions(
         ['--temp_location', 'gs://any-location/temp'])
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.pxd 
b/sdks/python/apache_beam/transforms/cy_combiners.pxd
index c0593e3..8834c5a 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.pxd
+++ b/sdks/python/apache_beam/transforms/cy_combiners.pxd
@@ -56,6 +56,16 @@ cdef class MeanInt64Accumulator(object):
   cpdef merge(self, accumulators)
 
 
+cdef class DistributionInt64Accumulator(object):
+  cdef readonly int64_t sum
+  cdef readonly int64_t count
+  cdef readonly int64_t min
+  cdef readonly int64_t max
+  cpdef add_input(self, int64_t element)
+  @cython.locals(accumulator=DistributionInt64Accumulator)
+  cpdef merge(self, accumulators)
+
+
 cdef class SumDoubleAccumulator(object):
   cdef readonly double value
   cpdef add_input(self, double element)
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py 
b/sdks/python/apache_beam/transforms/cy_combiners.py
index e239735..139b8a3 100644
--- a/sdks/python/apache_beam/transforms/cy_combiners.py
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -175,6 +175,38 @@ class MeanInt64Accumulator(object):
     return self.sum // self.count if self.count else _NAN
 
 
+class DistributionInt64Accumulator(object):
+  def __init__(self):
+    self.sum = 0
+    self.count = 0
+    self.min = INT64_MAX
+    self.max = INT64_MIN
+
+  def add_input(self, element):
+    element = int(element)
+    if not INT64_MIN <= element <= INT64_MAX:
+      raise OverflowError(element)
+    self.sum += element
+    self.count += 1
+    self.min = min(self.min, element)
+    self.max = max(self.max, element)
+
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.sum += accumulator.sum
+      self.count += accumulator.count
+      self.min = min(self.min, accumulator.min)
+      self.max = max(self.max, accumulator.max)
+
+  def extract_output(self):
+    if not INT64_MIN <= self.sum <= INT64_MAX:
+      self.sum %= 2**64
+      if self.sum >= INT64_MAX:
+        self.sum -= 2**64
+    mean = self.sum // self.count if self.count else _NAN
+    return mean, self.sum, self.count, self.min, self.max
+
+
 class CountCombineFn(AccumulatorCombineFn):
   _accumulator_type = CountAccumulator
 
@@ -195,6 +227,10 @@ class MeanInt64Fn(AccumulatorCombineFn):
   _accumulator_type = MeanInt64Accumulator
 
 
+class DistributionInt64Fn(AccumulatorCombineFn):
+  _accumulator_type = DistributionInt64Accumulator
+
+
 _POS_INF = float('inf')
 _NEG_INF = float('-inf')
 _NAN = float('nan')
diff --git a/sdks/python/apache_beam/utils/counters.py 
b/sdks/python/apache_beam/utils/counters.py
index f924853..dcb5683 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -134,6 +134,7 @@ class Counter(object):
   # Handy references to common counters.
   SUM = cy_combiners.SumInt64Fn()
   MEAN = cy_combiners.MeanInt64Fn()
+  BEAM_DISTRIBUTION = cy_combiners.DistributionInt64Fn()
 
   # Dataflow Distribution Accumulator Fn.
   # TODO(BEAM-4045): Generalize distribution counter if necessary.

Reply via email to