[ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82428&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82428
 ]

ASF GitHub Bot logged work on BEAM-3861:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Mar/18 19:34
            Start Date: 20/Mar/18 19:34
    Worklog Time Spent: 10m 
      Work Description: markflyhigh commented on a change in pull request 
#4874: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end 
test
URL: https://github.com/apache/beam/pull/4874#discussion_r175896233
 
 

 ##########
 File path: sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
 ##########
 @@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test is blocked until manually terminate the pipeline job.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+
+    # Set up PubSub environment.
+    from google.cloud import pubsub
+    self.pubsub_client = pubsub.Client(
+        project=self.test_pipeline.get_option('project'))
+    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+    self.input_sub = self.input_topic.subscription(INPUT_SUB)
+    self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+    self._cleanup_pubsub()
+
+    self.input_topic.create()
+    self.output_topic.create()
+    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+    self.input_sub.create()
+    self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+    """Inject numbers as test data to PubSub."""
+    logging.debug('Injecting %d numbers to topic %s',
+                  num_messages, topic.full_name)
+    for n in range(num_messages):
+      topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+    test_utils.cleanup_subscription([self.input_sub, self.output_sub])
+    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def tearDown(self):
+    self._cleanup_pubsub()
+
+  @attr('developing_test')
+  def test_streaming_wordcount_it(self):
+    # Set extra options to the pipeline for test purpose
+    pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+    extra_opts = {'input_sub': self.input_sub.full_name,
+                  'output_topic': self.output_topic.full_name,
+                  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+    # Generate input data and inject to PubSub.
+    test_utils.wait_for_subscriptions_created([self.input_sub])
 
 Review comment:
   Actually I want to wait and verify the subscription exists before injecting 
data to PubSub in following line. From the pubsub API, there seems no guarantee 
that subscription is created successfully or instantly after calling 
`subscription.create()`. If subscription is created after data injection 
starts, data may be missing and verification will not be correct.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82428)
    Time Spent: 2h 10m  (was: 2h)

> Build test infra for end-to-end streaming test in Python SDK
> ------------------------------------------------------------
>
>                 Key: BEAM-3861
>                 URL: https://issues.apache.org/jira/browse/BEAM-3861
>             Project: Beam
>          Issue Type: Task
>          Components: testing
>            Reporter: Mark Liu
>            Assignee: Mark Liu
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to