Repository: beam Updated Branches: refs/heads/master 77a0a2afc -> 9071c5516
Actually wait for exector service to shutdown Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2dea491c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2dea491c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2dea491c Branch: refs/heads/master Commit: 2dea491cab131b830e884bd408e82e97690259d9 Parents: 77a0a2a Author: Sergiy Byelozyorov <[email protected]> Authored: Wed Aug 23 19:04:31 2017 +0200 Committer: Ahmet Altay <[email protected]> Committed: Fri Aug 25 15:07:33 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/direct_runner_test.py | 41 ++++++++++++++++++++ .../apache_beam/runners/direct/executor.py | 1 + 2 files changed, 42 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2dea491c/sdks/python/apache_beam/runners/direct/direct_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py new file mode 100644 index 0000000..1c8b785 --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import threading +import unittest + +import apache_beam as beam +from apache_beam.testing import test_pipeline + + +class DirectPipelineResultTest(unittest.TestCase): + + def test_waiting_on_result_stops_executor_threads(self): + pre_test_threads = set(t.ident for t in threading.enumerate()) + + pipeline = test_pipeline.TestPipeline() + _ = (pipeline | beam.Create([{'foo': 'bar'}])) + result = pipeline.run() + result.wait_until_finish() + + post_test_threads = set(t.ident for t in threading.enumerate()) + new_threads = post_test_threads - pre_test_threads + self.assertEqual(len(new_threads), 0) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2dea491c/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index d465068..3e08b52 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -415,6 +415,7 @@ class _ExecutorServiceParallelExecutor(object): raise t, v, tb finally: self.executor_service.shutdown() + self.executor_service.await_completion() def schedule_consumers(self, committed_bundle): if committed_bundle.pcollection in self.value_to_consumers:
