Repository: beam Updated Branches: refs/heads/python-sdk c57c66ed4 -> 82599a241
Make TestPipeline.run fail when the underlying execution fails. Also, DataflowRunner will log the last error from its wait_until_finish method. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa3a2cb3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa3a2cb3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa3a2cb3 Branch: refs/heads/python-sdk Commit: aa3a2cb326a5f761eba9fe87fe7d57da9ce78555 Parents: c57c66e Author: Ahmet Altay <[email protected]> Authored: Thu Jan 19 16:13:07 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Fri Jan 20 16:46:20 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/dataflow_runner.py | 12 ++++-------- sdks/python/apache_beam/test_pipeline.py | 5 ++++- sdks/python/apache_beam/transforms/aggregator_test.py | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 330472b..fd22753 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -704,6 +704,10 @@ class DataflowPipelineResult(PipelineResult): thread.start() while thread.isAlive(): time.sleep(5.0) + if self.state != PipelineState.DONE: + logging.error( + 'Dataflow pipeline failed. State: %s, Error:\n%s', + self.state, getattr(self._runner, 'last_error_msg', None)) return self.state def __str__(self): @@ -714,11 +718,3 @@ class DataflowPipelineResult(PipelineResult): def __repr__(self): return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self))) - - -class DataflowRuntimeException(Exception): - """Indicates an error has occurred in running this pipeline.""" - - def __init__(self, msg, result): - super(DataflowRuntimeException, self).__init__(msg) - self.result = result http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py index c29a879..7d85af9 100644 --- a/sdks/python/apache_beam/test_pipeline.py +++ b/sdks/python/apache_beam/test_pipeline.py @@ -22,6 +22,7 @@ import shlex from apache_beam.internal import pickler from apache_beam.pipeline import Pipeline +from apache_beam.runners.runner import PipelineState from apache_beam.utils.pipeline_options import PipelineOptions from nose.plugins.skip import SkipTest @@ -89,7 +90,9 @@ class TestPipeline(Pipeline): def run(self): result = super(TestPipeline, self).run() if self.blocking: - result.wait_until_finish() + state = result.wait_until_finish() + assert state == PipelineState.DONE, "Pipeline execution failed." + return result def _parse_test_option_args(self, argv): http://git-wip-us.apache.org/repos/asf/beam/blob/aa3a2cb3/sdks/python/apache_beam/transforms/aggregator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py index d493c46..a2a4144 100644 --- a/sdks/python/apache_beam/transforms/aggregator_test.py +++ b/sdks/python/apache_beam/transforms/aggregator_test.py @@ -20,9 +20,9 @@ import unittest import apache_beam as beam +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import combiners from apache_beam.transforms.aggregator import Aggregator -from apache_beam.test_pipeline import TestPipeline class AggregatorTest(unittest.TestCase):
