[ 
https://issues.apache.org/jira/browse/BEAM-4037?focusedWorklogId=89253&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-89253
 ]

ASF GitHub Bot logged work on BEAM-4037:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Apr/18 04:52
            Start Date: 10/Apr/18 04:52
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #5071: [BEAM-4037] Add 
streaming wordcount snippets and test
URL: https://github.com/apache/beam/pull/5071
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9b150c8d3f0..0f9543a64aa 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -30,6 +30,8 @@
 string. The tags can contain only letters, digits and _.
 """
 
+import argparse
+
 import six
 
 import apache_beam as beam
@@ -628,6 +630,64 @@ def format_result(word_count):
     p.visit(SnippetUtils.RenameFiles(renames))
 
 
+def examples_wordcount_streaming(argv):
+  import apache_beam as beam
+  from apache_beam import window
+  from apache_beam.io import ReadFromPubSub
+  from apache_beam.io import WriteStringsToPubSub
+  from apache_beam.options.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import SetupOptions
+  from apache_beam.options.pipeline_options import StandardOptions
+
+  # Parse out arguments.
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--output_topic', required=True,
+      help=('Output PubSub topic of the form '
+            '"projects/<PROJECT>/topic/<TOPIC>".'))
+  group = parser.add_mutually_exclusive_group(required=True)
+  group.add_argument(
+      '--input_topic',
+      help=('Input PubSub topic of the form '
+            '"projects/<PROJECT>/topics/<TOPIC>".'))
+  group.add_argument(
+      '--input_subscription',
+      help=('Input PubSub subscription of the form '
+            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  with TestPipeline(options=pipeline_options) as p:
+    # [START example_wordcount_streaming_read]
+    # Read from Pub/Sub into a PCollection.
+    if known_args.input_subscription:
+      lines = p | beam.io.ReadFromPubSub(
+          subscription=known_args.input_subscription)
+    else:
+      lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
+    # [END example_wordcount_streaming_read]
+
+    output = (
+        lines
+        | 'DecodeUnicode' >> beam.FlatMap(
+            lambda encoded: encoded.decode('utf-8'))
+        | 'ExtractWords' >> beam.FlatMap(
+            lambda x: __import__('re').findall(r'[A-Za-z\']+', x))
+        | 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
+        | beam.WindowInto(window.FixedWindows(15, 0))
+        | 'Group' >> beam.GroupByKey()
+        | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], 
sum(word_ones[1])))
+        | 'Format' >> beam.Map(
+            lambda word_and_count: '%s: %d' % word_and_count))
+
+    # [START example_wordcount_streaming_write]
+    # Write to Pub/Sub
+    output | beam.io.WriteStringsToPubSub(known_args.output_topic)
+    # [END example_wordcount_streaming_write]
+
+
 def examples_ptransforms_templated(renames):
   # [START examples_ptransforms_templated]
   import apache_beam as beam
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 349d52542da..4380ce47271 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -1,3 +1,4 @@
+# coding=utf-8
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -25,6 +26,8 @@
 import unittest
 import uuid
 
+import mock
+
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam import pvalue
@@ -36,8 +39,10 @@
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils.windowed_value import WindowedValue
 
 # Protect against environments where apitools library is not available.
@@ -56,6 +61,14 @@
   datastore_pb2 = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
+# Protect against environments where the PubSub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import pubsub
+except ImportError:
+  pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 class ParDoTest(unittest.TestCase):
   """Tests for model/par-do."""
@@ -691,6 +704,59 @@ def test_examples_wordcount_debugging(self):
         self.get_output(result_path),
         ['Flourish: 3', 'stomach: 1'])
 
+  @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+  @mock.patch('apache_beam.io.ReadFromPubSub')
+  @mock.patch('apache_beam.io.WriteStringsToPubSub')
+  def test_examples_wordcount_streaming(self, *unused_mocks):
+    def FakeReadFromPubSub(topic=None, subscription=None, values=None):
+      expected_topic = topic
+      expected_subscription = subscription
+
+      def _inner(topic=None, subscription=None):
+        assert topic == expected_topic
+        assert subscription == expected_subscription
+        return TestStream().add_elements(values)
+      return _inner
+
+    class AssertTransform(beam.PTransform):
+      def __init__(self, matcher):
+        self.matcher = matcher
+
+      def expand(self, pcoll):
+        assert_that(pcoll, self.matcher)
+
+    def FakeWriteStringsToPubSub(topic=None, values=None):
+      expected_topic = topic
+
+      def _inner(topic=None, subscription=None):
+        assert topic == expected_topic
+        return AssertTransform(equal_to(values))
+      return _inner
+
+    # Test basic execution.
+    input_topic = 'projects/fake-beam-test-project/topic/intopic'
+    input_values = [TimestampedValue('a a b', 1),
+                    TimestampedValue(u'🤷 ¯\\_(ツ)_/¯ b b '.encode('utf-8'), 12),
+                    TimestampedValue('a b c c c', 20)]
+    output_topic = 'projects/fake-beam-test-project/topic/outtopic'
+    output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3']
+    beam.io.ReadFromPubSub = (
+        FakeReadFromPubSub(topic=input_topic, values=input_values))
+    beam.io.WriteStringsToPubSub = (
+        FakeWriteStringsToPubSub(topic=output_topic, values=output_values))
+    snippets.examples_wordcount_streaming([
+        '--input_topic', 'projects/fake-beam-test-project/topic/intopic',
+        '--output_topic', 'projects/fake-beam-test-project/topic/outtopic'])
+
+    # Test with custom subscription.
+    input_sub = 'projects/fake-beam-test-project/subscriptions/insub'
+    beam.io.ReadFromPubSub = FakeReadFromPubSub(subscription=input_sub,
+                                                values=input_values)
+    snippets.examples_wordcount_streaming([
+        '--input_subscription',
+        'projects/fake-beam-test-project/subscriptions/insub',
+        '--output_topic', 'projects/fake-beam-test-project/topic/outtopic'])
+
   def test_model_composite_transform_example(self):
     contents = ['aa bb cc', 'bb cc', 'cc']
     result_path = self.create_temp_file()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 89253)
    Time Spent: 1.5h  (was: 1h 20m)

> Add Python streaming wordcount snippets and test
> ------------------------------------------------
>
>                 Key: BEAM-4037
>                 URL: https://issues.apache.org/jira/browse/BEAM-4037
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Charles Chen
>            Assignee: Charles Chen
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We should add Python streaming wordcount snippets and tests.  The 
> documentation will refer to these snippets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to