[
https://issues.apache.org/jira/browse/BEAM-10959?focusedWorklogId=491385&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-491385
]
ASF GitHub Bot logged work on BEAM-10959:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Sep/20 19:43
Start Date: 25/Sep/20 19:43
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #12934:
URL: https://github.com/apache/beam/pull/12934#discussion_r495196209
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -376,35 +385,76 @@ def get(self, instruction_id, bundle_descriptor_id):
Moves the ``BundleProcessor`` from the inactive to the active cache.
"""
- try:
- # pop() is threadsafe
- processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
- except IndexError:
- processor = bundle_processor.BundleProcessor(
- self.fns[bundle_descriptor_id],
- self.state_handler_factory.create_state_handler(
- self.fns[bundle_descriptor_id].state_api_service_descriptor),
- self.data_channel_factory)
- self.active_bundle_processors[
+ with self._lock:
+ try:
+ # pop() is threadsafe
+ processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
+ self.active_bundle_processors[
+ instruction_id] = bundle_descriptor_id, processor
+ try:
+ del self.known_not_running_instruction_ids[instruction_id]
+ except KeyError:
+ # The instruction may have not been pre-registered before execution
+ # since activate() may have never been invoked
+ pass
+ return processor
+ except IndexError:
+ pass
+
+ # Make sure we instantiate the processor while not holding the lock.
+ processor = bundle_processor.BundleProcessor(
+ self.fns[bundle_descriptor_id],
+ self.state_handler_factory.create_state_handler(
+ self.fns[bundle_descriptor_id].state_api_service_descriptor),
+ self.data_channel_factory)
+ with self._lock:
+ self.active_bundle_processors[
instruction_id] = bundle_descriptor_id, processor
+ try:
+ del self.known_not_running_instruction_ids[instruction_id]
+ except KeyError:
+ # The instruction may have not been pre-registered before execution
+ # since activate() may have never been invoked
+ pass
return processor
def lookup(self, instruction_id):
# type: (str) -> Optional[bundle_processor.BundleProcessor]
"""
Return the requested ``BundleProcessor`` from the cache.
+
+ Will return ``None`` if the BundleProcessor is known but not yet ready.
Will
+ raise an error if the ``instruction_id`` is not known or has been
discarded.
"""
- return self.active_bundle_processors.get(instruction_id, (None, None))[-1]
+ with self._lock:
+ if instruction_id in self.failed_instruction_ids:
+ raise RuntimeError(
+ 'Bundle processing associated with %s has failed. '
+ 'Check prior failing response for details.' % instruction_id)
+ processor = self.active_bundle_processors.get(
+ instruction_id, (None, None))[-1]
+ if processor:
+ return processor
+ if instruction_id in self.known_not_running_instruction_ids:
+ return None
+ raise RuntimeError('Unknown process bundle id %s.' % instruction_id)
def discard(self, instruction_id):
# type: (str) -> None
"""
- Remove the ``BundleProcessor`` from the cache.
+ Marks the instruction id as failed shutting down the ``BundleProcessor``.
"""
- self.active_bundle_processors[instruction_id][1].shutdown()
- del self.active_bundle_processors[instruction_id]
+ with self._lock:
+ self.failed_instruction_ids[instruction_id] = True
+ while len(self.failed_instruction_ids) > MAX_FAILED_INSTRUCTIONS:
+ self.failed_instruction_ids.popitem()
Review comment:
Fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 491385)
Time Spent: 4h 50m (was: 4h 40m)
> Fix race where split/progress calls return "Unknown process bundle
> instruction"
> -------------------------------------------------------------------------------
>
> Key: BEAM-10959
> URL: https://issues.apache.org/jira/browse/BEAM-10959
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> Currently there is a race where a BundleProcessor doesn't exist until another
> thread picks up the task and inserts into the active set. This allows for
> split/progress calls to happen and error out with "Unknown process bundle
> instruction X".
> Since the control stream is ordered, we can guarantee that an uninitialized
> BundleProcessor exists that can respond to this really early split/progress
> calls.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)