This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b011a22 [BEAM-10860] avoid dictionary size change when shutting down
BundleProcessorCache
new 1377299 Merge pull request #12783 from
lazylynx/avoid_dictionary_size_change_when_BundleProcessorCache_shutdown
b011a22 is described below
commit b011a229e9a0134bcc94b9b75465afa6350e697c
Author: yoshiki.obata <[email protected]>
AuthorDate: Tue Sep 8 23:16:54 2020 +0900
[BEAM-10860] avoid dictionary size change when shutting down
BundleProcessorCache
---
sdks/python/apache_beam/runners/worker/sdk_worker.py | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index cf35c61..1a8e093 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -429,9 +429,8 @@ class BundleProcessorCache(object):
self.periodic_shutdown.join()
self.periodic_shutdown = None
- for instruction_id in self.active_bundle_processors:
- self.active_bundle_processors[instruction_id][1].shutdown()
- del self.active_bundle_processors[instruction_id]
+ for instruction_id in list(self.active_bundle_processors.keys()):
+ self.discard(instruction_id)
for cached_bundle_processors in self.cached_bundle_processors.values():
BundleProcessorCache._shutdown_cached_bundle_processors(
cached_bundle_processors)