This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f80a96e [BEAM-4093] Support Python ValidatesRunner test in streaming
(#5147)
f80a96e is described below
commit f80a96e5f94c6227226305e28d56912d2c92289d
Author: Mark Liu <[email protected]>
AuthorDate: Thu Apr 19 18:10:36 2018 -0700
[BEAM-4093] Support Python ValidatesRunner test in streaming (#5147)
* [BEAM-4093] Support Python ValidatesRunner test in streaming
* fixit! Remove unnecessary option reset
---
.../apache_beam/examples/streaming_wordcount_it_test.py | 2 ++
sdks/python/apache_beam/options/pipeline_options.py | 7 +++++++
.../apache_beam/runners/dataflow/test_dataflow_runner.py | 13 ++++++++-----
sdks/python/apache_beam/testing/test_pipeline.py | 3 ++-
4 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index d0b53f5..5db1878 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -37,6 +37,7 @@ INPUT_SUB = 'wc_subscription_input'
OUTPUT_SUB = 'wc_subscription_output'
DEFAULT_INPUT_NUMBERS = 500
+WAIT_UNTIL_FINISH_DURATION = 3 * 60 * 1000 # in milliseconds
class StreamingWordCountIT(unittest.TestCase):
@@ -87,6 +88,7 @@ class StreamingWordCountIT(unittest.TestCase):
timeout=400)
extra_opts = {'input_subscription': self.input_sub.full_name,
'output_topic': self.output_topic.full_name,
+ 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier,
pubsub_msg_verifier)}
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 7a2cd4b..b5f9d77 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -649,6 +649,13 @@ class TestOptions(PipelineOptions):
default=False,
help=('Used in unit testing runners without submitting the '
'actual job.'))
+ parser.add_argument(
+ '--wait_until_finish_duration',
+ default=None,
+ type=int,
+ help='The time to wait (in milliseconds) for test pipeline to finish. '
+ 'If it is set to None, it will wait indefinitely until the job '
+ 'is finished.')
def validate(self, validator):
errors = []
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 765ed24..eedfa60 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -18,6 +18,7 @@
"""Wrapper of Beam runners that's built for running and verifying e2e tests."""
from __future__ import print_function
+import logging
import time
from apache_beam.internal import pickler
@@ -37,6 +38,8 @@ class TestDataflowRunner(DataflowRunner):
"""Execute test pipeline and verify test matcher"""
options = pipeline._options.view_as(TestOptions)
on_success_matcher = options.on_success_matcher
+ wait_duration = options.wait_until_finish_duration
+ is_streaming = options.view_as(StandardOptions).streaming
# [BEAM-1889] Do not send this to remote workers also, there is no need to
# send this option to remote executors.
@@ -49,10 +52,11 @@ class TestDataflowRunner(DataflowRunner):
print('Found: %s.' % self.build_console_url(pipeline.options))
try:
- if not options.view_as(StandardOptions).streaming:
- self.result.wait_until_finish()
- else:
- self.wait_until_in_state(PipelineState.RUNNING)
+ self.wait_until_in_state(PipelineState.RUNNING)
+
+ if is_streaming and not wait_duration:
+ logging.warning('Waiting indefinitely for streaming job.')
+ self.result.wait_until_finish(duration=wait_duration)
if on_success_matcher:
from hamcrest import assert_that as hc_assert_that
@@ -60,7 +64,6 @@ class TestDataflowRunner(DataflowRunner):
finally:
if not self.result.is_in_terminal_state():
self.result.cancel()
- if options.view_as(StandardOptions).streaming:
self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
return self.result
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py
b/sdks/python/apache_beam/testing/test_pipeline.py
index 155190c..0525945 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -102,7 +102,8 @@ class TestPipeline(Pipeline):
result = super(TestPipeline, self).run(test_runner_api)
if self.blocking:
state = result.wait_until_finish()
- assert state == PipelineState.DONE, "Pipeline execution failed."
+ assert state in (PipelineState.DONE, PipelineState.CANCELLED), \
+ "Pipeline execution failed."
return result
--
To stop receiving notification emails like this one, please contact
[email protected].