Repository: beam
Updated Branches:
  refs/heads/master 0e6b3794c -> 860ac1d1f


Removing aggregators from Python SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/93832aa1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/93832aa1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/93832aa1

Branch: refs/heads/master
Commit: 93832aa16254175d094e9eabc469318316ddf858
Parents: 0e6b379
Author: Pablo <pabl...@google.com>
Authored: Tue Jan 17 11:26:22 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Feb 1 16:23:18 2017 -0800

----------------------------------------------------------------------
 .../examples/cookbook/datastore_wordcount.py    |  22 ++--
 .../apache_beam/examples/snippets/snippets.py   |  18 +--
 sdks/python/apache_beam/examples/wordcount.py   |  26 ++--
 .../apache_beam/examples/wordcount_debugging.py |  21 ++--
 sdks/python/apache_beam/metrics/metric_test.py  |   2 +-
 sdks/python/apache_beam/runners/common.py       |   5 -
 sdks/python/apache_beam/transforms/__init__.py  |   1 -
 .../python/apache_beam/transforms/aggregator.py | 120 -------------------
 .../apache_beam/transforms/aggregator_test.py   |  77 ------------
 sdks/python/apache_beam/transforms/core.py      |  12 +-
 10 files changed, 43 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py 
b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 067cb80..4d00b74 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -74,14 +74,14 @@ import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
 from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
+from apache_beam.metrics import Metrics
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
-empty_line_aggregator = beam.Aggregator('emptyLines')
-average_word_size_aggregator = beam.Aggregator('averageWordLength',
-                                               beam.combiners.MeanCombineFn(),
-                                               float)
+empty_line_counter = Metrics.counter('main', 'empty_lines')
+word_length_counter = Metrics.counter('main', 'word_lengths')
+word_counter = Metrics.counter('main', 'total_words')
 
 
 class WordExtractingDoFn(beam.DoFn):
@@ -91,7 +91,7 @@ class WordExtractingDoFn(beam.DoFn):
     """Returns an iterator over words in contents of Cloud Datastore entity.
     The element is a line of text.  If the line is blank, note that, too.
     Args:
-      context: the call-specific context: data and aggregator.
+      context: the call-specific context with input data.
     Returns:
       The processed element.
     """
@@ -101,10 +101,11 @@ class WordExtractingDoFn(beam.DoFn):
       text_line = content_value.string_value
 
     if not text_line:
-      context.aggregate_to(empty_line_aggregator, 1)
+      empty_line_counter.inc()
     words = re.findall(r'[A-Za-z\']+', text_line)
     for w in words:
-      context.aggregate_to(average_word_size_aggregator, len(w))
+      word_length_counter.inc(len(w))
+      word_counter.inc()
     return words
 
 
@@ -246,10 +247,9 @@ def run(argv=None):
   result = read_from_datastore(gcloud_options.project, known_args,
                                pipeline_options)
 
-  empty_line_values = result.aggregated_values(empty_line_aggregator)
-  logging.info('number of empty lines: %d', sum(empty_line_values.values()))
-  word_length_values = result.aggregated_values(average_word_size_aggregator)
-  logging.info('average word lengths: %s', word_length_values.values())
+  result.metrics().query()
+  #TODO(pabloem)(BEAM-1366) Fix these once metrics are 100% queriable.
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 631ab2d..9b775e2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -32,6 +32,7 @@ string. The tags can contain only letters, digits and _.
 
 import apache_beam as beam
 from apache_beam.test_pipeline import TestPipeline
+from apache_beam.metrics import Metrics
 
 # Quiet some pylint warnings that happen because of the somewhat special
 # format for the code snippets.
@@ -516,13 +517,12 @@ def examples_wordcount_debugging(renames):
   class FilterTextFn(beam.DoFn):
     """A DoFn that filters for a specific key based on a regular expression."""
 
-    # A custom aggregator can track values in your pipeline as it runs. Create
-    # custom aggregators matched_word and unmatched_words.
-    matched_words = beam.Aggregator('matched_words')
-    umatched_words = beam.Aggregator('umatched_words')
-
     def __init__(self, pattern):
       self.pattern = pattern
+      # A custom metric can track values in your pipeline as it runs. Create
+      # custom metrics matched_word and unmatched_words.
+      self.matched_words = Metrics.counter(self.__class__, 'matched_words')
+      self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')
 
     def process(self, context):
       word, _ = context.element
@@ -532,8 +532,8 @@ def examples_wordcount_debugging(renames):
         # Logging UI.
         logging.info('Matched %s', word)
 
-        # Add 1 to the custom aggregator matched_words
-        context.aggregate_to(self.matched_words, 1)
+        # Add 1 to the custom metric counter matched_words
+        self.matched_words.inc()
         yield context.element
       else:
         # Log at the "DEBUG" level each element that is not matched. Different
@@ -543,8 +543,8 @@ def examples_wordcount_debugging(renames):
         # Logger. This log message will not be visible in the Cloud Logger.
         logging.debug('Did not match %s', word)
 
-        # Add 1 to the custom aggregator umatched_words
-        context.aggregate_to(self.umatched_words, 1)
+        # Add 1 to the custom metric counter umatched_words
+        self.umatched_words.inc()
   # [END example_wordcount_debugging_logging]
   # [END example_wordcount_debugging_aggregators]
 

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py 
b/sdks/python/apache_beam/examples/wordcount.py
index 92929af..3a120fd 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -26,36 +26,38 @@ import re
 import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
+from apache_beam.metrics import Metrics
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
 
-empty_line_aggregator = beam.Aggregator('emptyLines')
-average_word_size_aggregator = beam.Aggregator('averageWordLength',
-                                               beam.combiners.MeanCombineFn(),
-                                               float)
-
-
 class WordExtractingDoFn(beam.DoFn):
   """Parse each line of input text into words."""
 
+  def __init__(self):
+    super(WordExtractingDoFn, self).__init__()
+    self.words_counter = Metrics.counter(self.__class__, 'words')
+    self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
+    self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')
+
   def process(self, context):
     """Returns an iterator over the words of this element.
 
     The element is a line of text.  If the line is blank, note that, too.
 
     Args:
-      context: the call-specific context: data and aggregator.
+      context: the call-specific context.
 
     Returns:
       The processed element.
     """
     text_line = context.element.strip()
     if not text_line:
-      context.aggregate_to(empty_line_aggregator, 1)
+      self.empty_line_counter.inc(1)
     words = re.findall(r'[A-Za-z\']+', text_line)
     for w in words:
-      context.aggregate_to(average_word_size_aggregator, len(w))
+      self.words_counter.inc()
+      self.word_lengths_counter.inc(len(w))
     return words
 
 
@@ -98,11 +100,7 @@ def run(argv=None):
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()
   result.wait_until_finish()
-  empty_line_values = result.aggregated_values(empty_line_aggregator)
-  logging.info('number of empty lines: %d', sum(empty_line_values.values()))
-  word_length_values = result.aggregated_values(average_word_size_aggregator)
-  logging.info('average word lengths: %s', word_length_values.values())
-
+  #TODO(pabloem)(BEAM-1366) Add querying of metrics once they are queriable.
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py 
b/sdks/python/apache_beam/examples/wordcount_debugging.py
index ac13f35..ac24660 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -48,25 +48,22 @@ import re
 import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
+from apache_beam.metrics import Metrics
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
 
 class FilterTextFn(beam.DoFn):
   """A DoFn that filters for a specific key based on a regular expression."""
-
-  # A custom aggregator can track values in your pipeline as it runs. Those
-  # values will be displayed in the Dataflow Monitoring UI when this pipeline 
is
-  # run using the Dataflow service. These aggregators below track the number of
-  # matched and unmatched words. Learn more at
-  # https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
-  # the Dataflow Monitoring UI.
-  matched_words = beam.Aggregator('matched_words')
-  umatched_words = beam.Aggregator('umatched_words')
-
   def __init__(self, pattern):
     super(FilterTextFn, self).__init__()
     self.pattern = pattern
+    # A custom metric can track values in your pipeline as it runs. Those
+    # values will be available in the monitoring system of the runner used
+    # to run the pipeline. These metrics below track the number of
+    # matched and unmatched words.
+    self.matched_words = Metrics.counter(self.__class__, 'matched_words')
+    self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')
 
   def process(self, context):
     word, _ = context.element
@@ -75,7 +72,7 @@ class FilterTextFn(beam.DoFn):
       # using the Dataflow service, these log lines will appear in the Cloud
       # Logging UI.
       logging.info('Matched %s', word)
-      context.aggregate_to(self.matched_words, 1)
+      self.matched_words.inc()
       yield context.element
     else:
       # Log at the "DEBUG" level each element that is not matched. Different 
log
@@ -84,7 +81,7 @@ class FilterTextFn(beam.DoFn):
       # Note currently only "INFO" and higher level logs are emitted to the
       # Cloud Logger. This log message will not be visible in the Cloud Logger.
       logging.debug('Did not match %s', word)
-      context.aggregate_to(self.umatched_words, 1)
+      self.umatched_words.inc()
 
 
 class CountWords(beam.PTransform):

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/metrics/metric_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric_test.py 
b/sdks/python/apache_beam/metrics/metric_test.py
index c478a85..4860edf 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -59,7 +59,7 @@ class MetricsTest(unittest.TestCase):
     MetricsEnvironment.set_current_container(MetricsContainer('mystep'))
     counter_ns = 'aCounterNamespace'
     distro_ns = 'aDistributionNamespace'
-    name = 'aName'
+    name = 'a_name'
     counter = Metrics.counter(counter_ns, name)
     distro = Metrics.distribution(distro_ns, name)
     counter.inc(10)

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index cb47513..dbbd9ba 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -402,13 +402,8 @@ class DoFnContext(object):
     else:
       return self.windowed_value.windows
 
-  def aggregate_to(self, aggregator, input_value):
-    self.state.counter_for(aggregator).update(input_value)
-
 
 # TODO(robertwb): Remove all these adapters once service is updated out.
-
-
 class _LoggingContextAdapter(LoggingContext):
 
   def __init__(self, underlying):

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py 
b/sdks/python/apache_beam/transforms/__init__.py
index db8e193..847fb8f 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -19,7 +19,6 @@
 
 # pylint: disable=wildcard-import
 from apache_beam.transforms import combiners
-from apache_beam.transforms.aggregator import *
 from apache_beam.transforms.core import *
 from apache_beam.transforms.ptransform import *
 from apache_beam.transforms.timeutil import *

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/aggregator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator.py 
b/sdks/python/apache_beam/transforms/aggregator.py
deleted file mode 100644
index 05ef635..0000000
--- a/sdks/python/apache_beam/transforms/aggregator.py
+++ /dev/null
@@ -1,120 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Support for user-defined Aggregators.
-
-Aggregators allow steps in a pipeline to perform custom aggregation of
-statistics about the data processed across all workers.  To update an
-aggregator's value, call aggregate_to() on the context passed to a DoFn.
-
-Example:
-import apache_beam as beam
-
-class ExampleDoFn(beam.DoFn):
-  def __init__(self):
-    super(ExampleDoFn, self).__init__()
-    self.simple_counter = beam.Aggregator('example-counter')
-
-  def process(self, context):
-    context.aggregate_to(self.simple_counter, 1)
-    ...
-
-These aggregators may be used by runners to collect and present statistics of
-a pipeline.  For example, in the Google Cloud Dataflow console, aggregators and
-their values show up in the UI under "Custom counters."
-
-You can also query the combined value(s) of an aggregator by calling
-aggregated_value() or aggregated_values() on the result object returned after
-running a pipeline.
-"""
-
-from __future__ import absolute_import
-
-from apache_beam.transforms import core
-
-
-class Aggregator(object):
-  """A user-specified aggregator of statistics about a pipeline step.
-
-  Args:
-    combine_fn: how to combine values input to the aggregation.
-      It must be one of these arithmetic functions:
-
-       - Python's built-in sum, min, max, any, and all.
-       - beam.combiners.MeanCombineFn()
-
-      The default is sum of 64-bit ints.
-
-    type: describes the type that will be accepted as input
-      for aggregation; by default types appropriate to the combine_fn
-      are accepted.
-
-  Example uses::
-
-    import apache_beam as beam
-
-    class ExampleDoFn(beam.DoFn):
-      def __init__(self):
-        super(ExampleDoFn, self).__init__()
-        self.simple_counter = beam.Aggregator('example-counter')
-        self.complex_counter = beam.Aggregator('other-counter', beam.Mean(),
-                                               float)
-
-      def process(self, context):
-        context.aggregate_to(self.simple_counter, 1)
-        context.aggregate_to(self.complex_counter, float(len(context.element))
-  """
-
-  def __init__(self, name, combine_fn=sum, input_type=int):
-    combine_fn = core.CombineFn.maybe_from_callable(combine_fn).for_input_type(
-        input_type)
-    if not _is_supported_kind(combine_fn):
-      raise ValueError(
-          'combine_fn %r (class %r) '
-          'does not map to a supported aggregation kind'
-          % (combine_fn, combine_fn.__class__))
-    self.name = name
-    self.combine_fn = combine_fn
-    self.input_type = input_type
-
-  def __str__(self):
-    return '<%s>' % self._str_internal()
-
-  def __repr__(self):
-    return '<%s at %s>' % (self._str_internal(), hex(id(self)))
-
-  def _str_internal(self):
-    """Internal helper function for both __str__ and __repr__."""
-    def get_name(thing):
-      try:
-        return thing.__name__
-      except AttributeError:
-        return thing.__class__.__name__
-
-    combine_fn_str = get_name(self.combine_fn)
-    input_arg = '(%s)' % get_name(self.input_type) if self.input_type else ''
-    if combine_fn_str == 'sum' and not input_arg:
-      combine_call = ''
-    else:
-      combine_call = ' %s%s' % (combine_fn_str, input_arg)
-    return 'Aggregator %s%s' % (self.name, combine_call)
-
-
-def _is_supported_kind(combine_fn):
-  # pylint: disable=wrong-import-order, wrong-import-position
-  from apache_beam.internal.apiclient import metric_translations
-  return combine_fn.__class__ in metric_translations

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py 
b/sdks/python/apache_beam/transforms/aggregator_test.py
deleted file mode 100644
index a2a4144..0000000
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ /dev/null
@@ -1,77 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for Aggregator class."""
-
-import unittest
-
-import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms import combiners
-from apache_beam.transforms.aggregator import Aggregator
-
-
-class AggregatorTest(unittest.TestCase):
-
-  def test_str(self):
-    basic = Aggregator('a-name')
-    self.assertEqual('<Aggregator a-name SumInt64Fn(int)>', str(basic))
-
-    for_max = Aggregator('max-name', max)
-    self.assertEqual('<Aggregator max-name MaxInt64Fn(int)>', str(for_max))
-
-    for_float = Aggregator('f-name', sum, float)
-    self.assertEqual('<Aggregator f-name SumFloatFn(float)>', str(for_float))
-
-    for_mean = Aggregator('m-name', combiners.MeanCombineFn(), float)
-    self.assertEqual('<Aggregator m-name MeanFloatFn(float)>', str(for_mean))
-
-  def test_aggregation(self):
-
-    mean = combiners.MeanCombineFn()
-    mean.__name__ = 'mean'
-    counter_types = [
-        (sum, int, 6),
-        (min, int, 0),
-        (max, int, 3),
-        (mean, int, 1),
-        (sum, float, 6.0),
-        (min, float, 0.0),
-        (max, float, 3.0),
-        (mean, float, 1.5),
-        (any, int, True),
-        (all, float, False),
-    ]
-    aggregators = [Aggregator('%s_%s' % (f.__name__, t.__name__), f, t)
-                   for f, t, _ in counter_types]
-
-    class UpdateAggregators(beam.DoFn):
-      def process(self, context):
-        for a in aggregators:
-          context.aggregate_to(a, context.element)
-
-    p = TestPipeline()
-    p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators())  # pylint: 
disable=expression-not-assigned
-    res = p.run()
-    for (_, _, expected), a in zip(counter_types, aggregators):
-      actual = res.aggregated_values(a).values()[0]
-      self.assertEqual(expected, actual)
-      self.assertEqual(type(expected), type(actual))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 70a03ae..20126d3 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -74,7 +74,7 @@ class DoFnProcessContext(DoFnContext):
     windows: windows of the element
       (in process method only; always None in start_bundle and finish_bundle)
     state: a DoFnState object, which holds the runner's internal state
-      for this element.  For example, aggregator state is here.
+      for this element.
       Not used by the pipeline code.
   """
 
@@ -109,16 +109,6 @@ class DoFnProcessContext(DoFnContext):
       self.timestamp = windowed_value.timestamp
       self.windows = windowed_value.windows
 
-  # TODO(sourabhbajaj): Move as we're trying to deprecate the use of context
-  def aggregate_to(self, aggregator, input_value):
-    """Provide a new input value for the aggregator.
-
-    Args:
-      aggregator: the aggregator to update
-      input_value: the new value to input to the combine_fn of this aggregator.
-    """
-    self.state.counter_for(aggregator).update(input_value)
-
 
 class NewDoFn(WithTypeHints, HasDisplayData):
   """A function object used by a transform with custom processing.

Reply via email to