gemini-code-assist[bot] commented on code in PR #38537:
URL: https://github.com/apache/beam/pull/38537#discussion_r3268327652
##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -57,45 +61,51 @@ def evict(self, pipeline: beam.Pipeline) -> None:
Removes the given pipeline and derived pipelines if a user pipeline.
Otherwise, removes the given derived pipeline.
"""
- user_pipeline = self.get_user_pipeline(pipeline)
- if user_pipeline:
- for d in self._user_pipelines[user_pipeline]:
- del self._derived_pipelines[d]
- del self._user_pipelines[user_pipeline]
- elif pipeline in self._derived_pipelines:
- del self._derived_pipelines[pipeline]
+ with self._lock:
+ user_pipeline = self.get_user_pipeline(pipeline)
+ if user_pipeline:
+ for d in self._user_pipelines[user_pipeline]:
+ self._derived_pipelines.pop(d, None)
+ self._user_pipelines.pop(user_pipeline, None)
+ elif pipeline in self._derived_pipelines:
+ self._derived_pipelines.pop(pipeline, None)
Review Comment:

The implementation of `evict` contains a logic error and does not align with
its docstring.
1. **Logic Error**: If a derived pipeline is passed,
`get_user_pipeline(pipeline)` returns the parent user pipeline. This causes the
code to enter the first `if` block (line 66), which proceeds to evict the
entire tree (the user pipeline and all its siblings) instead of just the
specified derived pipeline. Consequently, the `elif` block at line 70 is
unreachable for any tracked derived pipeline.
2. **Resource Leak**: Evicted pipelines are not removed from
`_pid_to_pipelines`, which leads to a memory leak and potential stale results
from `get_pipeline`.
The suggested fix correctly distinguishes between user and derived pipelines
and ensures all internal trackers are updated.
```suggestion
if pipeline in self._user_pipelines:
for d in self._user_pipelines[pipeline]:
self._derived_pipelines.pop(d, None)
self._pid_to_pipelines.pop(self._key(d), None)
self._user_pipelines.pop(pipeline, None)
elif pipeline in self._derived_pipelines:
user_pipeline = self._derived_pipelines.pop(pipeline, None)
if user_pipeline in self._user_pipelines:
try:
self._user_pipelines[user_pipeline].remove(pipeline)
except ValueError:
pass
self._pid_to_pipelines.pop(self._key(pipeline), None)
```
##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -57,45 +61,51 @@ def evict(self, pipeline: beam.Pipeline) -> None:
Removes the given pipeline and derived pipelines if a user pipeline.
Otherwise, removes the given derived pipeline.
"""
- user_pipeline = self.get_user_pipeline(pipeline)
- if user_pipeline:
- for d in self._user_pipelines[user_pipeline]:
- del self._derived_pipelines[d]
- del self._user_pipelines[user_pipeline]
- elif pipeline in self._derived_pipelines:
- del self._derived_pipelines[pipeline]
+ with self._lock:
+ user_pipeline = self.get_user_pipeline(pipeline)
+ if user_pipeline:
+ for d in self._user_pipelines[user_pipeline]:
+ self._derived_pipelines.pop(d, None)
+ self._user_pipelines.pop(user_pipeline, None)
+ elif pipeline in self._derived_pipelines:
+ self._derived_pipelines.pop(pipeline, None)
def clear(self) -> None:
"""Clears the tracker of all user and derived pipelines."""
# Remove all local_tempdir of created pipelines.
- for p in self._pid_to_pipelines.values():
- shutil.rmtree(p.local_tempdir, ignore_errors=True)
+ with self._lock:
+ pipelines = list(self._pid_to_pipelines.values())
+ self._user_pipelines.clear()
+ self._derived_pipelines.clear()
+ self._pid_to_pipelines.clear()
- self._user_pipelines.clear()
- self._derived_pipelines.clear()
- self._pid_to_pipelines.clear()
+ for p in pipelines:
+ shutil.rmtree(p.local_tempdir, ignore_errors=True)
def get_pipeline(self, pid: str) -> Optional[beam.Pipeline]:
"""Returns the pipeline corresponding to the given pipeline id."""
- return self._pid_to_pipelines.get(pid, None)
+ with self._lock:
+ return self._pid_to_pipelines.get(pid, None)
def add_user_pipeline(self, p: beam.Pipeline) -> beam.Pipeline:
"""Adds a user pipeline with an empty set of derived pipelines."""
- self._memoize_pipieline(p)
+ with self._lock:
+ self._memoize_pipieline(p)
- # Create a new node for the user pipeline if it doesn't exist already.
- user_pipeline = self.get_user_pipeline(p)
- if not user_pipeline:
- user_pipeline = p
- self._user_pipelines[p] = []
+ # Create a new node for the user pipeline if it doesn't exist already.
+ user_pipeline = self.get_user_pipeline(p)
+ if not user_pipeline:
+ user_pipeline = p
+ self._user_pipelines[p] = []
- return user_pipeline
+ return user_pipeline
def _memoize_pipieline(self, p: beam.Pipeline) -> None:
Review Comment:

Typo in method name. Should be `_memoize_pipeline`.
```suggestion
def _memoize_pipeline(self, p: beam.Pipeline) -> None:
```
##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -119,20 +129,21 @@ def add_derived_pipeline(
# Returns p.
ut.get_user_pipeline(derived2)
"""
- self._memoize_pipieline(maybe_user_pipeline)
- self._memoize_pipieline(derived_pipeline)
+ with self._lock:
+ self._memoize_pipieline(maybe_user_pipeline)
+ self._memoize_pipieline(derived_pipeline)
Review Comment:

Typo in method name. Should be `_memoize_pipeline`.
```suggestion
self._memoize_pipeline(maybe_user_pipeline)
self._memoize_pipeline(derived_pipeline)
```
##########
sdks/python/apache_beam/runners/interactive/user_pipeline_tracker.py:
##########
@@ -57,45 +61,51 @@ def evict(self, pipeline: beam.Pipeline) -> None:
Removes the given pipeline and derived pipelines if a user pipeline.
Otherwise, removes the given derived pipeline.
"""
- user_pipeline = self.get_user_pipeline(pipeline)
- if user_pipeline:
- for d in self._user_pipelines[user_pipeline]:
- del self._derived_pipelines[d]
- del self._user_pipelines[user_pipeline]
- elif pipeline in self._derived_pipelines:
- del self._derived_pipelines[pipeline]
+ with self._lock:
+ user_pipeline = self.get_user_pipeline(pipeline)
+ if user_pipeline:
+ for d in self._user_pipelines[user_pipeline]:
+ self._derived_pipelines.pop(d, None)
+ self._user_pipelines.pop(user_pipeline, None)
+ elif pipeline in self._derived_pipelines:
+ self._derived_pipelines.pop(pipeline, None)
def clear(self) -> None:
"""Clears the tracker of all user and derived pipelines."""
# Remove all local_tempdir of created pipelines.
- for p in self._pid_to_pipelines.values():
- shutil.rmtree(p.local_tempdir, ignore_errors=True)
+ with self._lock:
+ pipelines = list(self._pid_to_pipelines.values())
+ self._user_pipelines.clear()
+ self._derived_pipelines.clear()
+ self._pid_to_pipelines.clear()
- self._user_pipelines.clear()
- self._derived_pipelines.clear()
- self._pid_to_pipelines.clear()
+ for p in pipelines:
+ shutil.rmtree(p.local_tempdir, ignore_errors=True)
def get_pipeline(self, pid: str) -> Optional[beam.Pipeline]:
"""Returns the pipeline corresponding to the given pipeline id."""
- return self._pid_to_pipelines.get(pid, None)
+ with self._lock:
+ return self._pid_to_pipelines.get(pid, None)
def add_user_pipeline(self, p: beam.Pipeline) -> beam.Pipeline:
"""Adds a user pipeline with an empty set of derived pipelines."""
- self._memoize_pipieline(p)
+ with self._lock:
+ self._memoize_pipieline(p)
Review Comment:

Typo in method name. Should be `_memoize_pipeline`.
```suggestion
self._memoize_pipeline(p)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]