udim commented on a change in pull request #12934:
URL: https://github.com/apache/beam/pull/12934#discussion_r495185774



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -376,35 +385,76 @@ def get(self, instruction_id, bundle_descriptor_id):
 
     Moves the ``BundleProcessor`` from the inactive to the active cache.
     """
-    try:
-      # pop() is threadsafe
-      processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
-    except IndexError:
-      processor = bundle_processor.BundleProcessor(
-          self.fns[bundle_descriptor_id],
-          self.state_handler_factory.create_state_handler(
-              self.fns[bundle_descriptor_id].state_api_service_descriptor),
-          self.data_channel_factory)
-    self.active_bundle_processors[
+    with self._lock:
+      try:
+        # pop() is threadsafe
+        processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
+        self.active_bundle_processors[
+          instruction_id] = bundle_descriptor_id, processor
+        try:
+          del self.known_not_running_instruction_ids[instruction_id]
+        except KeyError:
+          # The instruction may have not been pre-registered before execution
+          # since activate() may have never been invoked
+          pass
+        return processor
+      except IndexError:
+        pass
+
+    # Make sure we instantiate the processor while not holding the lock.
+    processor = bundle_processor.BundleProcessor(
+        self.fns[bundle_descriptor_id],
+        self.state_handler_factory.create_state_handler(
+            self.fns[bundle_descriptor_id].state_api_service_descriptor),
+        self.data_channel_factory)
+    with self._lock:
+      self.active_bundle_processors[
         instruction_id] = bundle_descriptor_id, processor
+      try:
+        del self.known_not_running_instruction_ids[instruction_id]
+      except KeyError:
+        # The instruction may have not been pre-registered before execution
+        # since activate() may have never been invoked
+        pass
     return processor
 
   def lookup(self, instruction_id):
     # type: (str) -> Optional[bundle_processor.BundleProcessor]
 
     """
     Return the requested ``BundleProcessor`` from the cache.
+
+    Will return ``None`` if the BundleProcessor is known but not yet ready. 
Will
+    raise an error if the ``instruction_id`` is not known or has been 
discarded.
     """
-    return self.active_bundle_processors.get(instruction_id, (None, None))[-1]
+    with self._lock:
+      if instruction_id in self.failed_instruction_ids:
+        raise RuntimeError(
+            'Bundle processing associated with %s has failed. '
+            'Check prior failing response for details.' % instruction_id)
+      processor = self.active_bundle_processors.get(
+          instruction_id, (None, None))[-1]
+      if processor:
+        return processor
+      if instruction_id in self.known_not_running_instruction_ids:
+        return None
+      raise RuntimeError('Unknown process bundle id %s.' % instruction_id)
 
   def discard(self, instruction_id):
     # type: (str) -> None
 
     """
-    Remove the ``BundleProcessor`` from the cache.
+    Marks the instruction id as failed shutting down the ``BundleProcessor``.
     """
-    self.active_bundle_processors[instruction_id][1].shutdown()
-    del self.active_bundle_processors[instruction_id]
+    with self._lock:
+      self.failed_instruction_ids[instruction_id] = True
+      while len(self.failed_instruction_ids) > MAX_FAILED_INSTRUCTIONS:
+        self.failed_instruction_ids.popitem()

Review comment:
       Good catch, we probably want FIFO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to