Implement wait_until_finish method for existing runners. Also defines the not implemented cancel() method and updates existing usages to use wait_until_finish() instead of blocking runners.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74dda50e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74dda50e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74dda50e Branch: refs/heads/python-sdk Commit: 74dda50e64a93ab3c147ac24f7436ef04467aa27 Parents: f25c0e4 Author: Ahmet Altay <[email protected]> Authored: Mon Jan 9 18:23:20 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jan 18 09:55:35 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 2 +- .../apache_beam/runners/dataflow_runner.py | 60 +++++++++++++------- .../apache_beam/runners/direct/direct_runner.py | 15 ++--- sdks/python/apache_beam/runners/runner.py | 35 +++++++++++- sdks/python/apache_beam/runners/runner_test.py | 1 + .../apache_beam/runners/template_runner_test.py | 4 +- .../runners/test/test_dataflow_runner.py | 4 +- .../apache_beam/utils/pipeline_options.py | 3 +- sdks/python/run_postcommit.sh | 2 +- 9 files changed, 90 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6517960..7db39a9 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -167,7 +167,7 @@ class Pipeline(object): def __exit__(self, exc_type, exc_val, exc_tb): if not exc_type: - self.run() + self.run().wait_until_finish() def visit(self, visitor): """Visits depth-first every node of a pipeline's DAG. http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 3505acc..330472b 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -151,7 +151,7 @@ class DataflowRunner(PipelineRunner): if not page_token: break - runner.result = DataflowPipelineResult(response) + runner.result = DataflowPipelineResult(response, runner) runner.last_error_msg = last_error_msg def run(self, pipeline): @@ -176,23 +176,11 @@ class DataflowRunner(PipelineRunner): # Create the job self.result = DataflowPipelineResult( - self.dataflow_client.create_job(self.job)) + self.dataflow_client.create_job(self.job), self) if self.result.has_job and self.blocking: - thread = threading.Thread( - target=DataflowRunner.poll_for_job_completion, - args=(self, self.result.job_id())) - # Mark the thread as a daemon thread so a keyboard interrupt on the main - # thread will terminate everything. This is also the reason we will not - # use thread.join() to wait for the polling thread. - thread.daemon = True - thread.start() - while thread.isAlive(): - time.sleep(5.0) - if self.result.current_state() != PipelineState.DONE: - raise DataflowRuntimeException( - 'Dataflow pipeline failed:\n%s' - % getattr(self, 'last_error_msg', None), self.result) + self.result.wait_until_finish() + return self.result def _get_typehint_based_encoding(self, typehint, window_coder): @@ -651,9 +639,10 @@ class DataflowRunner(PipelineRunner): class DataflowPipelineResult(PipelineResult): """Represents the state of a pipeline run on the Dataflow service.""" - def __init__(self, job): + def __init__(self, job, runner): """Job is a Job message from the Dataflow API.""" self._job = job + self._runner = runner def job_id(self): return self._job.id @@ -662,12 +651,16 @@ class DataflowPipelineResult(PipelineResult): def has_job(self): return self._job is not None - def current_state(self): + @property + def state(self): """Return the current state of the remote job. Returns: A PipelineState object. """ + if not self.has_job: + return PipelineState.UNKNOWN + values_enum = dataflow_api.Job.CurrentStateValueValuesEnum api_jobstate_map = { values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN, @@ -684,11 +677,40 @@ class DataflowPipelineResult(PipelineResult): return (api_jobstate_map[self._job.currentState] if self._job.currentState else PipelineState.UNKNOWN) + def _is_in_terminal_state(self): + if not self.has_job: + return True + + return self.state in [ + PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED, + PipelineState.CANCELLED, PipelineState.DRAINED] + + def wait_until_finish(self, duration=None): + if not self._is_in_terminal_state(): + if not self.has_job: + raise IOError('Failed to get the Dataflow job id.') + if duration: + raise NotImplementedError( + 'DataflowRunner does not support duration argument.') + + thread = threading.Thread( + target=DataflowRunner.poll_for_job_completion, + args=(self._runner, self.job_id())) + + # Mark the thread as a daemon thread so a keyboard interrupt on the main + # thread will terminate everything. This is also the reason we will not + # use thread.join() to wait for the polling thread. + thread.daemon = True + thread.start() + while thread.isAlive(): + time.sleep(5.0) + return self.state + def __str__(self): return '<%s %s %s>' % ( self.__class__.__name__, self.job_id(), - self.current_state()) + self.state) def __repr__(self): return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self))) http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index a5c616b..dc2668d 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -43,7 +43,7 @@ class DirectRunner(PipelineRunner): def run(self, pipeline): """Execute the entire pipeline and returns an DirectPipelineResult.""" - # TODO: Move imports to top. Pipeline <-> Runner dependecy cause problems + # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems # with resolving imports when they are at top. # pylint: disable=wrong-import-position from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import \ @@ -76,12 +76,10 @@ class DirectRunner(PipelineRunner): executor.start(self.visitor.root_transforms) result = DirectPipelineResult(executor, evaluation_context) - # TODO(altay): If blocking: - # Block until the pipeline completes. This call will return after the - # pipeline was fully terminated (successfully or with a failure). - result.await_completion() - if self._cache: + # We are running in eager mode, block until the pipeline execution + # completes in order to have full results in the cache. + result.wait_until_finish() self._cache.finalize() return result @@ -141,8 +139,11 @@ class DirectPipelineResult(PipelineResult): def _is_in_terminal_state(self): return self._state is not PipelineState.RUNNING - def await_completion(self): + def wait_until_finish(self, duration=None): if not self._is_in_terminal_state(): + if duration: + raise NotImplementedError( + 'DirectRunner does not support duration argument.') try: self._executor.await_completion() self._state = PipelineState.DONE http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 3dc4d28..1a50df4 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -287,7 +287,7 @@ class PValueCache(object): class PipelineState(object): - """State of the Pipeline, as returned by PipelineResult.current_state(). + """State of the Pipeline, as returned by PipelineResult.state. This is meant to be the union of all the states any runner can put a pipeline in. Currently, it represents the values of the dataflow @@ -310,10 +310,39 @@ class PipelineResult(object): def __init__(self, state): self._state = state - def current_state(self): - """Return the current state of running the pipeline.""" + @property + def state(self): + """Return the current state of the pipeline execution.""" return self._state + def wait_until_finish(self, duration=None): + """Waits until the pipeline finishes and returns the final status. + + Args: + duration: The time to wait (in milliseconds) for job to finish. If it is + set to None, it will wait indefinitely until the job is finished. + + Raises: + IOError: If there is a persistent problem getting job information. + NotImplementedError: If the runner does not support this operation. + + Returns: + The final state of the pipeline, or None on timeout. + """ + raise NotImplementedError + + def cancel(self): + """Cancels the pipeline execution. + + Raises: + IOError: If there is a persistent problem getting job information. + NotImplementedError: If the runner does not support this operation. + + Returns: + The final state of the pipeline. + """ + raise NotImplementedError + # pylint: disable=unused-argument def aggregated_values(self, aggregator_or_name): """Return a dict of step names to values of the Aggregator.""" http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index ea86061..2b6c316 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -161,6 +161,7 @@ class RunnerTest(unittest.TestCase): (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5]) | 'do' >> beam.ParDo(MyDoFn())) result = p.run() + result.wait_until_finish() metrics = result.metrics().query() namespace = '{}.{}'.format(MyDoFn.__module__, MyDoFn.__name__) http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/template_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py index 457022d..af7f2c6 100644 --- a/sdks/python/apache_beam/runners/template_runner_test.py +++ b/sdks/python/apache_beam/runners/template_runner_test.py @@ -55,7 +55,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase): '--no_auth=True'])) pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned - pipeline.run() + pipeline.run().wait_until_finish() with open(dummy_file_name) as template_file: saved_job_dict = json.load(template_file) self.assertEqual( @@ -81,7 +81,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase): remote_runner.job = apiclient.Job(pipeline.options) with self.assertRaises(IOError): - pipeline.run() + pipeline.run().wait_until_finish() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/runners/test/test_dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py index 77655bd..823e534 100644 --- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py @@ -25,14 +25,16 @@ from apache_beam.utils.pipeline_options import TestOptions class TestDataflowRunner(DataflowRunner): def __init__(self): - super(TestDataflowRunner, self).__init__(blocking=True) + super(TestDataflowRunner, self).__init__() def run(self, pipeline): """Execute test pipeline and verify test matcher""" self.result = super(TestDataflowRunner, self).run(pipeline) + self.result.wait_until_finish() options = pipeline.options.view_as(TestOptions) if options.on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(options.on_success_matcher)) + return self.result http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/apache_beam/utils/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py index 9f57ee7..16b1640 100644 --- a/sdks/python/apache_beam/utils/pipeline_options.py +++ b/sdks/python/apache_beam/utils/pipeline_options.py @@ -182,8 +182,7 @@ class StandardOptions(PipelineOptions): parser.add_argument( '--runner', help=('Pipeline runner used to execute the workflow. Valid values are ' - 'DirectRunner, DataflowRunner, ' - 'and BlockingDataflowRunner.')) + 'DirectRunner, DataflowRunner.')) # Whether to enable streaming mode. parser.add_argument('--streaming', default=False, http://git-wip-us.apache.org/repos/asf/beam/blob/74dda50e/sdks/python/run_postcommit.sh ---------------------------------------------------------------------- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 67a257e..2e419a5 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -74,7 +74,7 @@ SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz) echo ">>> RUNNING DATAFLOW RUNNER VALIDATESRUNNER TESTS" python setup.py nosetests \ -a ValidatesRunner --test-pipeline-options=" \ - --runner=BlockingDataflowRunner \ + --runner=TestDataflowRunner \ --project=$PROJECT \ --staging_location=$GCS_LOCATION/staging-validatesrunner-test \ --temp_location=$GCS_LOCATION/temp-validatesrunner-test \
