[ https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=93005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93005 ]
ASF GitHub Bot logged work on BEAM-4093: ---------------------------------------- Author: ASF GitHub Bot Created on: 20/Apr/18 01:10 Start Date: 20/Apr/18 01:10 Worklog Time Spent: 10m Work Description: aaltay closed pull request #5147: [BEAM-4093] Support Python ValidatesRunner test in streaming URL: https://github.com/apache/beam/pull/5147 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py index d0b53f50d79..5db1878f34f 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py @@ -37,6 +37,7 @@ OUTPUT_SUB = 'wc_subscription_output' DEFAULT_INPUT_NUMBERS = 500 +WAIT_UNTIL_FINISH_DURATION = 3 * 60 * 1000 # in milliseconds class StreamingWordCountIT(unittest.TestCase): @@ -87,6 +88,7 @@ def test_streaming_wordcount_it(self): timeout=400) extra_opts = {'input_subscription': self.input_sub.full_name, 'output_topic': self.output_topic.full_name, + 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION, 'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)} diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7a2cd4bf1e4..b5f9d77617d 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -649,6 +649,13 @@ def _add_argparse_args(cls, parser): default=False, help=('Used in unit testing runners without submitting the ' 'actual job.')) + parser.add_argument( + '--wait_until_finish_duration', + default=None, + type=int, + help='The time to wait (in milliseconds) for test pipeline to finish. ' + 'If it is set to None, it will wait indefinitely until the job ' + 'is finished.') def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 765ed245785..eedfa60f9fd 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -18,6 +18,7 @@ """Wrapper of Beam runners that's built for running and verifying e2e tests.""" from __future__ import print_function +import logging import time from apache_beam.internal import pickler @@ -37,6 +38,8 @@ def run_pipeline(self, pipeline): """Execute test pipeline and verify test matcher""" options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher + wait_duration = options.wait_until_finish_duration + is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. @@ -49,10 +52,11 @@ def run_pipeline(self, pipeline): print('Found: %s.' % self.build_console_url(pipeline.options)) try: - if not options.view_as(StandardOptions).streaming: - self.result.wait_until_finish() - else: - self.wait_until_in_state(PipelineState.RUNNING) + self.wait_until_in_state(PipelineState.RUNNING) + + if is_streaming and not wait_duration: + logging.warning('Waiting indefinitely for streaming job.') + self.result.wait_until_finish(duration=wait_duration) if on_success_matcher: from hamcrest import assert_that as hc_assert_that @@ -60,7 +64,6 @@ def run_pipeline(self, pipeline): finally: if not self.result.is_in_terminal_state(): self.result.cancel() - if options.view_as(StandardOptions).streaming: self.wait_until_in_state(PipelineState.CANCELLED, timeout=300) return self.result diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index 155190c09a7..0525945f15f 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -102,7 +102,8 @@ def run(self, test_runner_api=True): result = super(TestPipeline, self).run(test_runner_api) if self.blocking: state = result.wait_until_finish() - assert state == PipelineState.DONE, "Pipeline execution failed." + assert state in (PipelineState.DONE, PipelineState.CANCELLED), \ + "Pipeline execution failed." return result ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 93005) Time Spent: 1h 40m (was: 1.5h) > Support Python ValidatesRunner test against TestDataflowRunner in streaming > --------------------------------------------------------------------------- > > Key: BEAM-4093 > URL: https://issues.apache.org/jira/browse/BEAM-4093 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core, testing > Reporter: Mark Liu > Assignee: Mark Liu > Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)