Repository: beam
Updated Branches:
  refs/heads/master 4084f71a1 -> c0d19f9f7


Adding a snippet for metrics


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f32cacb7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f32cacb7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f32cacb7

Branch: refs/heads/master
Commit: f32cacb7313b8352ddc054cd1b88d7a0462550db
Parents: 4084f71
Author: Pablo <pabl...@google.com>
Authored: Mon May 22 15:39:44 2017 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Wed May 24 15:14:41 2017 -0700

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          | 62 ++++++++++++++++++++
 1 file changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f32cacb7/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 6654fef..e302465 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -32,6 +32,8 @@ from apache_beam import typehints
 from apache_beam.coders.coders import ToStringCoder
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.examples.snippets import snippets
+from apache_beam.metrics import Metrics
+from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.utils.windowed_value import WindowedValue
@@ -689,6 +691,66 @@ class SnippetsTest(unittest.TestCase):
     expect = ['a; a...@example.com; x4312', 'b; b...@example.com; x8452']
     self.assertEqual(expect, self.get_output(result_path))
 
+  def test_model_use_and_query_metrics(self):
+    """DebuggingWordCount example snippets."""
+
+    import re
+
+    p = TestPipeline()  # Use TestPipeline for testing.
+    words = p | beam.Create(['albert', 'sam', 'mark', 'sarah',
+                             'swati', 'daniel', 'andrea'])
+
+    # pylint: disable=unused-variable
+    # [START metrics_usage_example]
+    class FilterTextFn(beam.DoFn):
+      """A DoFn that filters for a specific key based on a regex."""
+
+      def __init__(self, pattern):
+        self.pattern = pattern
+        # A custom metric can track values in your pipeline as it runs. Create
+        # custom metrics to count unmatched words, and know the distribution of
+        # word lengths in the input PCollection.
+        self.word_len_dist = Metrics.distribution(self.__class__,
+                                                  'word_len_dist')
+        self.unmatched_words = Metrics.counter(self.__class__,
+                                               'unmatched_words')
+
+      def process(self, element):
+        word = element
+        self.word_len_dist.update(len(word))
+        if re.match(self.pattern, word):
+          yield element
+        else:
+          self.unmatched_words.inc()
+
+    filtered_words = (
+        words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))
+    # [END metrics_usage_example]
+    # pylint: enable=unused-variable
+
+    # [START metrics_check_values_example]
+    result = p.run()
+    result.wait_until_finish()
+
+    custom_distribution = result.metrics().query(
+        MetricsFilter().with_name('word_len_dist'))['distributions']
+    custom_counter = result.metrics().query(
+        MetricsFilter().with_name('unmatched_words'))['counters']
+
+    if custom_distribution:
+      logging.info('The average word length was %d',
+                   custom_distribution[0].committed.mean)
+    if custom_counter:
+      logging.info('There were %d words that did not match the filter.',
+                   custom_counter[0].committed)
+    # [END metrics_check_values_example]
+
+    # There should be 4 words that did not match
+    self.assertEqual(custom_counter[0].committed, 4)
+    # The shortest word is 3 characters, the longest is 6
+    self.assertEqual(custom_distribution[0].committed.min, 3)
+    self.assertEqual(custom_distribution[0].committed.max, 6)
+
   def test_model_join_using_side_inputs(self):
     name_list = ['a', 'b']
     email_list = [['a', 'a...@example.com'], ['b', 'b...@example.com']]

Reply via email to