[ 
https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=93005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93005
 ]

ASF GitHub Bot logged work on BEAM-4093:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/18 01:10
            Start Date: 20/Apr/18 01:10
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #5147: [BEAM-4093] Support 
Python ValidatesRunner test in streaming
URL: https://github.com/apache/beam/pull/5147
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index d0b53f50d79..5db1878f34f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -37,6 +37,7 @@
 OUTPUT_SUB = 'wc_subscription_output'
 
 DEFAULT_INPUT_NUMBERS = 500
+WAIT_UNTIL_FINISH_DURATION = 3 * 60 * 1000   # in milliseconds
 
 
 class StreamingWordCountIT(unittest.TestCase):
@@ -87,6 +88,7 @@ def test_streaming_wordcount_it(self):
                                                timeout=400)
     extra_opts = {'input_subscription': self.input_sub.full_name,
                   'output_topic': self.output_topic.full_name,
+                  'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 7a2cd4bf1e4..b5f9d77617d 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -649,6 +649,13 @@ def _add_argparse_args(cls, parser):
         default=False,
         help=('Used in unit testing runners without submitting the '
               'actual job.'))
+    parser.add_argument(
+        '--wait_until_finish_duration',
+        default=None,
+        type=int,
+        help='The time to wait (in milliseconds) for test pipeline to finish. '
+             'If it is set to None, it will wait indefinitely until the job '
+             'is finished.')
 
   def validate(self, validator):
     errors = []
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 765ed245785..eedfa60f9fd 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -18,6 +18,7 @@
 """Wrapper of Beam runners that's built for running and verifying e2e tests."""
 from __future__ import print_function
 
+import logging
 import time
 
 from apache_beam.internal import pickler
@@ -37,6 +38,8 @@ def run_pipeline(self, pipeline):
     """Execute test pipeline and verify test matcher"""
     options = pipeline._options.view_as(TestOptions)
     on_success_matcher = options.on_success_matcher
+    wait_duration = options.wait_until_finish_duration
+    is_streaming = options.view_as(StandardOptions).streaming
 
     # [BEAM-1889] Do not send this to remote workers also, there is no need to
     # send this option to remote executors.
@@ -49,10 +52,11 @@ def run_pipeline(self, pipeline):
       print('Found: %s.' % self.build_console_url(pipeline.options))
 
     try:
-      if not options.view_as(StandardOptions).streaming:
-        self.result.wait_until_finish()
-      else:
-        self.wait_until_in_state(PipelineState.RUNNING)
+      self.wait_until_in_state(PipelineState.RUNNING)
+
+      if is_streaming and not wait_duration:
+        logging.warning('Waiting indefinitely for streaming job.')
+      self.result.wait_until_finish(duration=wait_duration)
 
       if on_success_matcher:
         from hamcrest import assert_that as hc_assert_that
@@ -60,7 +64,6 @@ def run_pipeline(self, pipeline):
     finally:
       if not self.result.is_in_terminal_state():
         self.result.cancel()
-      if options.view_as(StandardOptions).streaming:
         self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
 
     return self.result
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py 
b/sdks/python/apache_beam/testing/test_pipeline.py
index 155190c09a7..0525945f15f 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -102,7 +102,8 @@ def run(self, test_runner_api=True):
     result = super(TestPipeline, self).run(test_runner_api)
     if self.blocking:
       state = result.wait_until_finish()
-      assert state == PipelineState.DONE, "Pipeline execution failed."
+      assert state in (PipelineState.DONE, PipelineState.CANCELLED), \
+          "Pipeline execution failed."
 
     return result
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 93005)
    Time Spent: 1h 40m  (was: 1.5h)

> Support Python ValidatesRunner test against TestDataflowRunner in streaming
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-4093
>                 URL: https://issues.apache.org/jira/browse/BEAM-4093
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core, testing
>            Reporter: Mark Liu
>            Assignee: Mark Liu
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to