gemini-code-assist[bot] commented on code in PR #38537:
URL: https://github.com/apache/beam/pull/38537#discussion_r3267074024


##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -39,13 +40,16 @@ class UserPipelineTracker:
   derived pipelines.
   """
   def __init__(self):
+    self._lock = threading.RLock()
     self._user_pipelines: dict[beam.Pipeline, list[beam.Pipeline]] = {}
     self._derived_pipelines: dict[beam.Pipeline] = {}
     self._pid_to_pipelines: dict[beam.Pipeline] = {}

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The type hints for `_derived_pipelines` and `_pid_to_pipelines` are 
incomplete. In Python, the `dict` type hint requires two arguments (key and 
value). Based on their usage in the class, `_derived_pipelines` maps pipelines 
to pipelines, and `_pid_to_pipelines` maps string IDs to pipelines.
   
   ```suggestion
       self._derived_pipelines: dict[beam.Pipeline, beam.Pipeline] = {}
       self._pid_to_pipelines: dict[str, beam.Pipeline] = {}
   ```



##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -57,45 +61,52 @@ def evict(self, pipeline: beam.Pipeline) -> None:
     Removes the given pipeline and derived pipelines if a user pipeline.
     Otherwise, removes the given derived pipeline.
     """
-    user_pipeline = self.get_user_pipeline(pipeline)
-    if user_pipeline:
-      for d in self._user_pipelines[user_pipeline]:
-        del self._derived_pipelines[d]
-      del self._user_pipelines[user_pipeline]
-    elif pipeline in self._derived_pipelines:
-      del self._derived_pipelines[pipeline]
+    with self._lock:
+      user_pipeline = self.get_user_pipeline(pipeline)
+      if user_pipeline:
+        for d in self._user_pipelines[user_pipeline]:
+          self._derived_pipelines.pop(d, None)
+        self._user_pipelines.pop(user_pipeline, None)
+      elif pipeline in self._derived_pipelines:
+        self._derived_pipelines.pop(pipeline, None)
 
   def clear(self) -> None:
     """Clears the tracker of all user and derived pipelines."""
     # Remove all local_tempdir of created pipelines.
-    for p in self._pid_to_pipelines.values():
+    with self._lock:
+      pipelines = list(self._pid_to_pipelines.values())
+    for p in pipelines:
       shutil.rmtree(p.local_tempdir, ignore_errors=True)
 
-    self._user_pipelines.clear()
-    self._derived_pipelines.clear()
-    self._pid_to_pipelines.clear()
+    with self._lock:
+      self._user_pipelines.clear()
+      self._derived_pipelines.clear()
+      self._pid_to_pipelines.clear()

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The current implementation of `clear()` contains a race condition that can 
lead to a resource leak. If a new pipeline is added between the two separate 
lock acquisitions (lines 76 and 81), it will be cleared from the internal 
dictionaries by the second lock block, but its temporary directory will not be 
removed because it was not included in the `pipelines` list captured at line 77.
   
   To resolve this, you should clear the dictionaries within the same lock 
block where you capture the list of pipelines for deletion. This ensures that 
any pipeline removed from the tracker is also scheduled for directory cleanup, 
while any pipeline added after the lock is released remains safely in the 
tracker.
   
   ```suggestion
       with self._lock:
         pipelines = list(self._pid_to_pipelines.values())
         self._user_pipelines.clear()
         self._derived_pipelines.clear()
         self._pid_to_pipelines.clear()
   
       for p in pipelines:
         shutil.rmtree(p.local_tempdir, ignore_errors=True)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to