Repository: beam Updated Branches: refs/heads/python-sdk f25c0e434 -> 36a7d3491
Make TestPipeline.run() blocking by default. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e49f518 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e49f518 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e49f518 Branch: refs/heads/python-sdk Commit: 2e49f518bcb2f0ab16e4f17f75f85eb28534a1b4 Parents: 74dda50 Author: Ahmet Altay <[email protected]> Authored: Fri Jan 13 14:09:22 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jan 18 09:55:35 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/test_pipeline.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2e49f518/sdks/python/apache_beam/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py index 69f4ddd..c29a879 100644 --- a/sdks/python/apache_beam/test_pipeline.py +++ b/sdks/python/apache_beam/test_pipeline.py @@ -58,7 +58,8 @@ class TestPipeline(Pipeline): runner=None, options=None, argv=None, - is_integration_test=False): + is_integration_test=False, + blocking=True): """Initialize a pipeline object for test. Args: @@ -72,6 +73,7 @@ class TestPipeline(Pipeline): is None. is_integration_test: True if the test is an integration test, False otherwise. + blocking: Run method will wait until pipeline execution is completed. Raises: ValueError: if either the runner or options argument is not of the @@ -79,10 +81,17 @@ class TestPipeline(Pipeline): """ self.is_integration_test = is_integration_test self.options_list = self._parse_test_option_args(argv) + self.blocking = blocking if options is None: options = PipelineOptions(self.options_list) super(TestPipeline, self).__init__(runner, options) + def run(self): + result = super(TestPipeline, self).run() + if self.blocking: + result.wait_until_finish() + return result + def _parse_test_option_args(self, argv): """Parse value of command line argument: --test-pipeline-options to get pipeline options.
