This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b011a22  [BEAM-10860] avoid dictionary size change when shutting down 
BundleProcessorCache
     new 1377299  Merge pull request #12783 from 
lazylynx/avoid_dictionary_size_change_when_BundleProcessorCache_shutdown
b011a22 is described below

commit b011a229e9a0134bcc94b9b75465afa6350e697c
Author: yoshiki.obata <[email protected]>
AuthorDate: Tue Sep 8 23:16:54 2020 +0900

    [BEAM-10860] avoid dictionary size change when shutting down 
BundleProcessorCache
---
 sdks/python/apache_beam/runners/worker/sdk_worker.py | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index cf35c61..1a8e093 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -429,9 +429,8 @@ class BundleProcessorCache(object):
       self.periodic_shutdown.join()
       self.periodic_shutdown = None
 
-    for instruction_id in self.active_bundle_processors:
-      self.active_bundle_processors[instruction_id][1].shutdown()
-      del self.active_bundle_processors[instruction_id]
+    for instruction_id in list(self.active_bundle_processors.keys()):
+      self.discard(instruction_id)
     for cached_bundle_processors in self.cached_bundle_processors.values():
       BundleProcessorCache._shutdown_cached_bundle_processors(
           cached_bundle_processors)

Reply via email to