Repository: beam Updated Branches: refs/heads/master 4084f71a1 -> c0d19f9f7
Adding a snippet for metrics Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f32cacb7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f32cacb7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f32cacb7 Branch: refs/heads/master Commit: f32cacb7313b8352ddc054cd1b88d7a0462550db Parents: 4084f71 Author: Pablo <pabl...@google.com> Authored: Mon May 22 15:39:44 2017 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed May 24 15:14:41 2017 -0700 ---------------------------------------------------------------------- .../examples/snippets/snippets_test.py | 62 ++++++++++++++++++++ 1 file changed, 62 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f32cacb7/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 6654fef..e302465 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -32,6 +32,8 @@ from apache_beam import typehints from apache_beam.coders.coders import ToStringCoder from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.examples.snippets import snippets +from apache_beam.metrics import Metrics +from apache_beam.metrics.metric import MetricsFilter from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.utils.windowed_value import WindowedValue @@ -689,6 +691,66 @@ class SnippetsTest(unittest.TestCase): expect = ['a; a...@example.com; x4312', 'b; b...@example.com; x8452'] self.assertEqual(expect, self.get_output(result_path)) + def test_model_use_and_query_metrics(self): + """DebuggingWordCount example snippets.""" + + import re + + p = TestPipeline() # Use TestPipeline for testing. + words = p | beam.Create(['albert', 'sam', 'mark', 'sarah', + 'swati', 'daniel', 'andrea']) + + # pylint: disable=unused-variable + # [START metrics_usage_example] + class FilterTextFn(beam.DoFn): + """A DoFn that filters for a specific key based on a regex.""" + + def __init__(self, pattern): + self.pattern = pattern + # A custom metric can track values in your pipeline as it runs. Create + # custom metrics to count unmatched words, and know the distribution of + # word lengths in the input PCollection. + self.word_len_dist = Metrics.distribution(self.__class__, + 'word_len_dist') + self.unmatched_words = Metrics.counter(self.__class__, + 'unmatched_words') + + def process(self, element): + word = element + self.word_len_dist.update(len(word)) + if re.match(self.pattern, word): + yield element + else: + self.unmatched_words.inc() + + filtered_words = ( + words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*'))) + # [END metrics_usage_example] + # pylint: enable=unused-variable + + # [START metrics_check_values_example] + result = p.run() + result.wait_until_finish() + + custom_distribution = result.metrics().query( + MetricsFilter().with_name('word_len_dist'))['distributions'] + custom_counter = result.metrics().query( + MetricsFilter().with_name('unmatched_words'))['counters'] + + if custom_distribution: + logging.info('The average word length was %d', + custom_distribution[0].committed.mean) + if custom_counter: + logging.info('There were %d words that did not match the filter.', + custom_counter[0].committed) + # [END metrics_check_values_example] + + # There should be 4 words that did not match + self.assertEqual(custom_counter[0].committed, 4) + # The shortest word is 3 characters, the longest is 6 + self.assertEqual(custom_distribution[0].committed.min, 3) + self.assertEqual(custom_distribution[0].committed.max, 6) + def test_model_join_using_side_inputs(self): name_list = ['a', 'b'] email_list = [['a', 'a...@example.com'], ['b', 'b...@example.com']]