scwhittle commented on code in PR #36367:
URL: https://github.com/apache/beam/pull/36367#discussion_r2405273187


##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -502,7 +502,7 @@ def _clean_receiving_queue(self, instruction_id):
     instruction_id cannot be reused for new queue.
     """
     with self._receive_lock:
-      self._received.pop(instruction_id)
+      self._received.pop(instruction_id, None)

Review Comment:
   add a comment on why it might not be present?



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -559,15 +559,18 @@ def discard(self, instruction_id, exception):
     """
     Marks the instruction id as failed shutting down the ``BundleProcessor``.
     """
+    processor = None
     with self._lock:
       self.failed_instruction_ids[instruction_id] = exception
       while len(self.failed_instruction_ids) > MAX_FAILED_INSTRUCTIONS:
         self.failed_instruction_ids.popitem(last=False)
-      processor = self.active_bundle_processors[instruction_id][1]
-      del self.active_bundle_processors[instruction_id]
+      if instruction_id in self.active_bundle_processors:
+        processor = self.active_bundle_processors.pop(instruction_id)[1]

Review Comment:
   To be more future proof here it seems like you should change the local 
processor variable to the full list, and then iterate over it to shutdown below.
   
   Alternatively, is there a reason the value in the dict is a list instead of 
single processor and could that be changed?
   
   



##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -854,6 +860,11 @@ def close(self):
       channel.close()
     self._data_channel_cache.clear()
 
+  def cleanup(self, instruction_id):
+    # type: (str) -> None
+    for channel in self._data_channel_cache.values():

Review Comment:
   do we need to worry about thread-safety with concurrent creates?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to