y1chi commented on a change in pull request #12934:
URL: https://github.com/apache/beam/pull/12934#discussion_r495165024
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -376,35 +385,76 @@ def get(self, instruction_id, bundle_descriptor_id):
Moves the ``BundleProcessor`` from the inactive to the active cache.
"""
- try:
- # pop() is threadsafe
- processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
- except IndexError:
- processor = bundle_processor.BundleProcessor(
- self.fns[bundle_descriptor_id],
- self.state_handler_factory.create_state_handler(
- self.fns[bundle_descriptor_id].state_api_service_descriptor),
- self.data_channel_factory)
- self.active_bundle_processors[
+ with self._lock:
+ try:
+ # pop() is threadsafe
+ processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
+ self.active_bundle_processors[
+ instruction_id] = bundle_descriptor_id, processor
+ try:
+ del self.known_not_running_instruction_ids[instruction_id]
+ except KeyError:
+ # The instruction may have not been pre-registered before execution
+ # since activate() may have never been invoked
+ pass
+ return processor
+ except IndexError:
+ pass
+
+ # Make sure we instantiate the processor while not holding the lock.
+ processor = bundle_processor.BundleProcessor(
+ self.fns[bundle_descriptor_id],
+ self.state_handler_factory.create_state_handler(
+ self.fns[bundle_descriptor_id].state_api_service_descriptor),
+ self.data_channel_factory)
+ with self._lock:
+ self.active_bundle_processors[
instruction_id] = bundle_descriptor_id, processor
+ try:
+ del self.known_not_running_instruction_ids[instruction_id]
+ except KeyError:
+ # The instruction may have not been pre-registered before execution
+ # since activate() may have never been invoked
+ pass
return processor
def lookup(self, instruction_id):
# type: (str) -> Optional[bundle_processor.BundleProcessor]
"""
Return the requested ``BundleProcessor`` from the cache.
+
+ Will return ``None`` if the BundleProcessor is known but not yet ready.
Will
+ raise an error if the ``instruction_id`` is not known or has been
discarded.
"""
- return self.active_bundle_processors.get(instruction_id, (None, None))[-1]
+ with self._lock:
+ if instruction_id in self.failed_instruction_ids:
+ raise RuntimeError(
+ 'Bundle processing associated with %s has failed. '
+ 'Check prior failing response for details.' % instruction_id)
+ processor = self.active_bundle_processors.get(
+ instruction_id, (None, None))[-1]
+ if processor:
+ return processor
+ if instruction_id in self.known_not_running_instruction_ids:
+ return None
+ raise RuntimeError('Unknown process bundle id %s.' % instruction_id)
def discard(self, instruction_id):
# type: (str) -> None
"""
- Remove the ``BundleProcessor`` from the cache.
+ Marks the instruction id as failed shutting down the ``BundleProcessor``.
"""
- self.active_bundle_processors[instruction_id][1].shutdown()
- del self.active_bundle_processors[instruction_id]
+ with self._lock:
+ self.failed_instruction_ids[instruction_id] = True
+ while len(self.failed_instruction_ids) > MAX_FAILED_INSTRUCTIONS:
+ self.failed_instruction_ids.popitem()
Review comment:
Is it by design that we pop the items in LIFO order?
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -376,35 +385,76 @@ def get(self, instruction_id, bundle_descriptor_id):
Moves the ``BundleProcessor`` from the inactive to the active cache.
"""
- try:
- # pop() is threadsafe
- processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
- except IndexError:
- processor = bundle_processor.BundleProcessor(
- self.fns[bundle_descriptor_id],
- self.state_handler_factory.create_state_handler(
- self.fns[bundle_descriptor_id].state_api_service_descriptor),
- self.data_channel_factory)
- self.active_bundle_processors[
Review comment:
Seems that if we lock here we won't have to duplicate code
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]