[ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=82606&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82606
 ]

ASF GitHub Bot logged work on BEAM-3861:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Mar/18 01:58
            Start Date: 21/Mar/18 01:58
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #4874: [BEAM-3861] Improve 
test infra in Python SDK for streaming end-to-end test
URL: https://github.com/apache/beam/pull/4874
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py 
b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 12f73510873..7ef95d85f1a 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -36,7 +36,7 @@
 
 def split_fn(lines):
   import re
-  return re.findall(r'[A-Za-z\']+', lines)
+  return re.findall(r'[A-Za-z0-9\']+', lines)
 
 
 def run(argv=None):
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
new file mode 100644
index 00000000000..a95e5fa8f53
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test blocks until the job is manually terminated.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+
+    # Set up PubSub environment.
+    from google.cloud import pubsub
+    self.pubsub_client = pubsub.Client(
+        project=self.test_pipeline.get_option('project'))
+    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+    self.input_sub = self.input_topic.subscription(INPUT_SUB)
+    self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+    self._cleanup_pubsub()
+
+    self.input_topic.create()
+    self.output_topic.create()
+    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+    self.input_sub.create()
+    self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+    """Inject numbers as test data to PubSub."""
+    logging.debug('Injecting %d numbers to topic %s',
+                  num_messages, topic.full_name)
+    for n in range(num_messages):
+      topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
+    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def tearDown(self):
+    self._cleanup_pubsub()
+
+  @attr('developing_test')
+  def test_streaming_wordcount_it(self):
+    # Set extra options to the pipeline for test purpose
+    pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+    extra_opts = {'input_sub': self.input_sub.full_name,
+                  'output_topic': self.output_topic.full_name,
+                  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+    # Generate input data and inject to PubSub.
+    test_utils.wait_for_subscriptions_created([self.input_sub])
+    self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    streaming_wordcount.run(
+        self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index aad3fc7b88f..09a919045de 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -18,13 +18,19 @@
 """Wrapper of Beam runners that's built for running and verifying e2e tests."""
 from __future__ import print_function
 
+import time
+
 from apache_beam.internal import pickler
 from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+from apache_beam.runners.runner import PipelineState
 
 __all__ = ['TestDataflowRunner']
 
+WAIT_TIMEOUT = 2 * 60
+
 
 class TestDataflowRunner(DataflowRunner):
   def run_pipeline(self, pipeline):
@@ -46,10 +52,39 @@ def run_pipeline(self, pipeline):
       print (
           'Found: https://console.cloud.google.com/dataflow/jobsDetail'
           '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-    self.result.wait_until_finish()
+
+    if not options.view_as(StandardOptions).streaming:
+      self.result.wait_until_finish()
+    else:
+      # TODO: Ideally, we want to wait until workers start successfully.
+      self.wait_until_running()
 
     if on_success_matcher:
       from hamcrest import assert_that as hc_assert_that
       hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
     return self.result
+
+  def _is_in_terminate_state(self, job_state):
+    return job_state in [
+        PipelineState.STOPPED, PipelineState.DONE,
+        PipelineState.FAILED, PipelineState.CANCELLED,
+        PipelineState.UPDATED, PipelineState.DRAINED,
+    ]
+
+  def wait_until_running(self):
+    """Wait until Dataflow pipeline terminate or enter RUNNING state."""
+    if not self.result.has_job:
+      raise IOError('Failed to get the Dataflow job id.')
+
+    start_time = time.time()
+    while time.time() - start_time <= WAIT_TIMEOUT:
+      job_state = self.result.state
+      if (self._is_in_terminate_state(job_state) or
+          self.result.state == PipelineState.RUNNING):
+        return job_state
+      time.sleep(5)
+
+    raise RuntimeError('Timeout after %d seconds while waiting for job %s '
+                       'enters RUNNING or terminate state.' %
+                       (WAIT_TIMEOUT, self.result.job_id))
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py 
b/sdks/python/apache_beam/testing/test_pipeline.py
index 46eeb75183d..155190c09a7 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -98,8 +98,8 @@ def __init__(self,
       options = PipelineOptions(self.options_list)
     super(TestPipeline, self).__init__(runner, options)
 
-  def run(self):
-    result = super(TestPipeline, self).run()
+  def run(self, test_runner_api=True):
+    result = super(TestPipeline, self).run(test_runner_api)
     if self.blocking:
       state = result.wait_until_finish()
       assert state == PipelineState.DONE, "Pipeline execution failed."
diff --git a/sdks/python/apache_beam/testing/test_utils.py 
b/sdks/python/apache_beam/testing/test_utils.py
index 5676186b752..f84b7f04b9b 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -22,9 +22,11 @@
 
 import hashlib
 import imp
+import logging
 import os
 import shutil
 import tempfile
+import time
 
 from mock import Mock
 from mock import patch
@@ -129,3 +131,49 @@ def delete_files(file_paths):
     raise RuntimeError('Clean up failed. Invalid file path: %s.' %
                        file_paths)
   FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+  """Wait for all PubSub subscriptions are created."""
+  return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+  """Wait for all PubSub topics are created."""
+  return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+  needs_wait = set(components)
+  start_time = time.time()
+  while time.time() - start_time <= timeout:
+    for c in components:
+      if c in needs_wait and c.exists():
+        needs_wait.remove(c)
+    if len(needs_wait) == 0:
+      return True
+    time.sleep(2)
+
+  raise RuntimeError(
+      'Timeout after %d seconds. %d of %d topics/subscriptions not exist. '
+      'They are %s.' %
+      (timeout, len(needs_wait), len(components), list(needs_wait)))
+
+
+def cleanup_subscriptions(subs):
+  """Cleanup PubSub subscriptions if exist."""
+  _cleanup_pubsub(subs)
+
+
+def cleanup_topics(topics):
+  """Cleanup PubSub topics if exist."""
+  _cleanup_pubsub(topics)
+
+
+def _cleanup_pubsub(components):
+  for c in components:
+    if c.exists():
+      c.delete()
+    else:
+      logging.debug('Cannot delete topic/subscription. %s does not exist.',
+                    c.full_name)
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py 
b/sdks/python/apache_beam/testing/test_utils_test.py
index 0018c0ed154..ba0b940136e 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -22,6 +22,8 @@
 import tempfile
 import unittest
 
+import mock
+
 from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.testing import test_utils as utils
@@ -57,8 +59,6 @@ def test_delete_files_fails_with_invalid_arg(self):
       utils.delete_files([])
 
   def test_temp_dir_removes_files(self):
-    dir_path = ''
-    file_path = ''
     with utils.TempDir() as tempdir:
       dir_path = tempdir.get_path()
       file_path = tempdir.create_temp_file()
@@ -80,6 +80,57 @@ def test_temp_file_field_correct(self):
         self.assertEqual(f.readline(), 'line2\n')
         self.assertEqual(f.readline(), 'line3\n')
 
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
+    sub1 = mock.MagicMock()
+    sub1.exists.return_value = True
+    sub2 = mock.MagicMock()
+    sub2.exists.return_value = False
+    with self.assertRaises(RuntimeError) as error:
+      utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
+    self.assertTrue(sub1.exists.called)
+    self.assertTrue(sub2.exists.called)
+    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_topics_created_fails(self, patched_time_sleep):
+    topic1 = mock.MagicMock()
+    topic1.exists.return_value = True
+    topic2 = mock.MagicMock()
+    topic2.exists.return_value = False
+    with self.assertRaises(RuntimeError) as error:
+      utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
+    self.assertTrue(topic1.exists.called)
+    self.assertTrue(topic2.exists.called)
+    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
+    sub1 = mock.MagicMock()
+    sub1.exists.return_value = True
+    self.assertTrue(
+        utils.wait_for_subscriptions_created([sub1], timeout=0.1))
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
+    topic1 = mock.MagicMock()
+    topic1.exists.return_value = True
+    self.assertTrue(
+        utils.wait_for_subscriptions_created([topic1], timeout=0.1))
+    self.assertTrue(topic1.exists.called)
+
+  def test_cleanup_subscriptions(self):
+    mock_sub = mock.MagicMock()
+    mock_sub.exist.return_value = True
+    utils.cleanup_subscriptions([mock_sub])
+    self.assertTrue(mock_sub.delete.called)
+
+  def test_cleanup_topics(self):
+    mock_topics = mock.MagicMock()
+    mock_topics.exist.return_value = True
+    utils.cleanup_subscriptions([mock_topics])
+    self.assertTrue(mock_topics.delete.called)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82606)
    Time Spent: 4h 10m  (was: 4h)

> Build test infra for end-to-end streaming test in Python SDK
> ------------------------------------------------------------
>
>                 Key: BEAM-3861
>                 URL: https://issues.apache.org/jira/browse/BEAM-3861
>             Project: Beam
>          Issue Type: Task
>          Components: testing
>            Reporter: Mark Liu
>            Assignee: Mark Liu
>            Priority: Major
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to