[ https://issues.apache.org/jira/browse/BEAM-4037?focusedWorklogId=89253&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89253 ]
ASF GitHub Bot logged work on BEAM-4037: ---------------------------------------- Author: ASF GitHub Bot Created on: 10/Apr/18 04:52 Start Date: 10/Apr/18 04:52 Worklog Time Spent: 10m Work Description: aaltay closed pull request #5071: [BEAM-4037] Add streaming wordcount snippets and test URL: https://github.com/apache/beam/pull/5071 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 9b150c8d3f0..0f9543a64aa 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -30,6 +30,8 @@ string. The tags can contain only letters, digits and _. """ +import argparse + import six import apache_beam as beam @@ -628,6 +630,64 @@ def format_result(word_count): p.visit(SnippetUtils.RenameFiles(renames)) +def examples_wordcount_streaming(argv): + import apache_beam as beam + from apache_beam import window + from apache_beam.io import ReadFromPubSub + from apache_beam.io import WriteStringsToPubSub + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.pipeline_options import SetupOptions + from apache_beam.options.pipeline_options import StandardOptions + + # Parse out arguments. + parser = argparse.ArgumentParser() + parser.add_argument( + '--output_topic', required=True, + help=('Output PubSub topic of the form ' + '"projects/<PROJECT>/topic/<TOPIC>".')) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + '--input_topic', + help=('Input PubSub topic of the form ' + '"projects/<PROJECT>/topics/<TOPIC>".')) + group.add_argument( + '--input_subscription', + help=('Input PubSub subscription of the form ' + '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."')) + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(StandardOptions).streaming = True + + with TestPipeline(options=pipeline_options) as p: + # [START example_wordcount_streaming_read] + # Read from Pub/Sub into a PCollection. + if known_args.input_subscription: + lines = p | beam.io.ReadFromPubSub( + subscription=known_args.input_subscription) + else: + lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic) + # [END example_wordcount_streaming_read] + + output = ( + lines + | 'DecodeUnicode' >> beam.FlatMap( + lambda encoded: encoded.decode('utf-8')) + | 'ExtractWords' >> beam.FlatMap( + lambda x: __import__('re').findall(r'[A-Za-z\']+', x)) + | 'PairWithOnes' >> beam.Map(lambda x: (x, 1)) + | beam.WindowInto(window.FixedWindows(15, 0)) + | 'Group' >> beam.GroupByKey() + | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))) + | 'Format' >> beam.Map( + lambda word_and_count: '%s: %d' % word_and_count)) + + # [START example_wordcount_streaming_write] + # Write to Pub/Sub + output | beam.io.WriteStringsToPubSub(known_args.output_topic) + # [END example_wordcount_streaming_write] + + def examples_ptransforms_templated(renames): # [START examples_ptransforms_templated] import apache_beam as beam diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 349d52542da..4380ce47271 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -1,3 +1,4 @@ +# coding=utf-8 # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -25,6 +26,8 @@ import unittest import uuid +import mock + import apache_beam as beam from apache_beam import coders from apache_beam import pvalue @@ -36,8 +39,10 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.test_stream import TestStream from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms.window import TimestampedValue from apache_beam.utils.windowed_value import WindowedValue # Protect against environments where apitools library is not available. @@ -56,6 +61,14 @@ datastore_pb2 = None # pylint: enable=wrong-import-order, wrong-import-position +# Protect against environments where the PubSub library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from google.cloud import pubsub +except ImportError: + pubsub = None +# pylint: enable=wrong-import-order, wrong-import-position + class ParDoTest(unittest.TestCase): """Tests for model/par-do.""" @@ -691,6 +704,59 @@ def test_examples_wordcount_debugging(self): self.get_output(result_path), ['Flourish: 3', 'stomach: 1']) + @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') + @mock.patch('apache_beam.io.ReadFromPubSub') + @mock.patch('apache_beam.io.WriteStringsToPubSub') + def test_examples_wordcount_streaming(self, *unused_mocks): + def FakeReadFromPubSub(topic=None, subscription=None, values=None): + expected_topic = topic + expected_subscription = subscription + + def _inner(topic=None, subscription=None): + assert topic == expected_topic + assert subscription == expected_subscription + return TestStream().add_elements(values) + return _inner + + class AssertTransform(beam.PTransform): + def __init__(self, matcher): + self.matcher = matcher + + def expand(self, pcoll): + assert_that(pcoll, self.matcher) + + def FakeWriteStringsToPubSub(topic=None, values=None): + expected_topic = topic + + def _inner(topic=None, subscription=None): + assert topic == expected_topic + return AssertTransform(equal_to(values)) + return _inner + + # Test basic execution. + input_topic = 'projects/fake-beam-test-project/topic/intopic' + input_values = [TimestampedValue('a a b', 1), + TimestampedValue(u'🤷 ¯\\_(ツ)_/¯ b b '.encode('utf-8'), 12), + TimestampedValue('a b c c c', 20)] + output_topic = 'projects/fake-beam-test-project/topic/outtopic' + output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3'] + beam.io.ReadFromPubSub = ( + FakeReadFromPubSub(topic=input_topic, values=input_values)) + beam.io.WriteStringsToPubSub = ( + FakeWriteStringsToPubSub(topic=output_topic, values=output_values)) + snippets.examples_wordcount_streaming([ + '--input_topic', 'projects/fake-beam-test-project/topic/intopic', + '--output_topic', 'projects/fake-beam-test-project/topic/outtopic']) + + # Test with custom subscription. + input_sub = 'projects/fake-beam-test-project/subscriptions/insub' + beam.io.ReadFromPubSub = FakeReadFromPubSub(subscription=input_sub, + values=input_values) + snippets.examples_wordcount_streaming([ + '--input_subscription', + 'projects/fake-beam-test-project/subscriptions/insub', + '--output_topic', 'projects/fake-beam-test-project/topic/outtopic']) + def test_model_composite_transform_example(self): contents = ['aa bb cc', 'bb cc', 'cc'] result_path = self.create_temp_file() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 89253) Time Spent: 1.5h (was: 1h 20m) > Add Python streaming wordcount snippets and test > ------------------------------------------------ > > Key: BEAM-4037 > URL: https://issues.apache.org/jira/browse/BEAM-4037 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core > Reporter: Charles Chen > Assignee: Charles Chen > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > We should add Python streaming wordcount snippets and tests. The > documentation will refer to these snippets. -- This message was sent by Atlassian JIRA (v7.6.3#76005)