shunping commented on code in PR #38537:
URL: https://github.com/apache/beam/pull/38537#discussion_r3267189153
##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -39,13 +40,16 @@ class UserPipelineTracker:
derived pipelines.
"""
def __init__(self):
+ self._lock = threading.RLock()
self._user_pipelines: dict[beam.Pipeline, list[beam.Pipeline]] = {}
self._derived_pipelines: dict[beam.Pipeline] = {}
self._pid_to_pipelines: dict[beam.Pipeline] = {}
Review Comment:
Done
##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -57,45 +61,52 @@ def evict(self, pipeline: beam.Pipeline) -> None:
Removes the given pipeline and derived pipelines if a user pipeline.
Otherwise, removes the given derived pipeline.
"""
- user_pipeline = self.get_user_pipeline(pipeline)
- if user_pipeline:
- for d in self._user_pipelines[user_pipeline]:
- del self._derived_pipelines[d]
- del self._user_pipelines[user_pipeline]
- elif pipeline in self._derived_pipelines:
- del self._derived_pipelines[pipeline]
+ with self._lock:
+ user_pipeline = self.get_user_pipeline(pipeline)
+ if user_pipeline:
+ for d in self._user_pipelines[user_pipeline]:
+ self._derived_pipelines.pop(d, None)
+ self._user_pipelines.pop(user_pipeline, None)
+ elif pipeline in self._derived_pipelines:
+ self._derived_pipelines.pop(pipeline, None)
def clear(self) -> None:
"""Clears the tracker of all user and derived pipelines."""
# Remove all local_tempdir of created pipelines.
- for p in self._pid_to_pipelines.values():
+ with self._lock:
+ pipelines = list(self._pid_to_pipelines.values())
+ for p in pipelines:
shutil.rmtree(p.local_tempdir, ignore_errors=True)
- self._user_pipelines.clear()
- self._derived_pipelines.clear()
- self._pid_to_pipelines.clear()
+ with self._lock:
+ self._user_pipelines.clear()
+ self._derived_pipelines.clear()
+ self._pid_to_pipelines.clear()
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]