Repository: beam
Updated Branches:
  refs/heads/python-sdk f25c0e434 -> 36a7d3491


Make TestPipeline.run() blocking by default.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e49f518
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e49f518
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e49f518

Branch: refs/heads/python-sdk
Commit: 2e49f518bcb2f0ab16e4f17f75f85eb28534a1b4
Parents: 74dda50
Author: Ahmet Altay <[email protected]>
Authored: Fri Jan 13 14:09:22 2017 -0800
Committer: Robert Bradshaw <[email protected]>
Committed: Wed Jan 18 09:55:35 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/test_pipeline.py | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2e49f518/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py 
b/sdks/python/apache_beam/test_pipeline.py
index 69f4ddd..c29a879 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -58,7 +58,8 @@ class TestPipeline(Pipeline):
                runner=None,
                options=None,
                argv=None,
-               is_integration_test=False):
+               is_integration_test=False,
+               blocking=True):
     """Initialize a pipeline object for test.
 
     Args:
@@ -72,6 +73,7 @@ class TestPipeline(Pipeline):
         is None.
       is_integration_test: True if the test is an integration test, False
         otherwise.
+      blocking: Run method will wait until pipeline execution is completed.
 
     Raises:
       ValueError: if either the runner or options argument is not of the
@@ -79,10 +81,17 @@ class TestPipeline(Pipeline):
     """
     self.is_integration_test = is_integration_test
     self.options_list = self._parse_test_option_args(argv)
+    self.blocking = blocking
     if options is None:
       options = PipelineOptions(self.options_list)
     super(TestPipeline, self).__init__(runner, options)
 
+  def run(self):
+    result = super(TestPipeline, self).run()
+    if self.blocking:
+      result.wait_until_finish()
+    return result
+
   def _parse_test_option_args(self, argv):
     """Parse value of command line argument: --test-pipeline-options to get
     pipeline options.

Reply via email to