gemini-code-assist[bot] commented on code in PR #38620:
URL: https://github.com/apache/beam/pull/38620#discussion_r3304973506
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -108,14 +108,22 @@ def purge(self, owner):
for value in to_delete:
self._destructor(value)
- def get(self, *key):
+ def get(self, *key, owner=None):
if not self._live_owners:
raise RuntimeError("At least one owner must be registered.")
with self._lock:
if key not in self._cache:
self._cache[key] = _SharedCacheEntry(self._constructor(*key), set())
- for owner in self._live_owners:
+ if owner is not None:
+ if owner not in self._live_owners:
+ raise RuntimeError("The requesting owner must be registered.")
self._cache[key].owners.add(owner)
+ for live_owner, is_context in self._live_owners.items():
+ if is_context:
+ self._cache[key].owners.add(live_owner)
+ else:
+ for live_owner in self._live_owners:
+ self._cache[key].owners.add(live_owner)
return self._cache[key].obj
Review Comment:

### Potential Resource Leaks and Race Conditions in `get()`
There are two critical issues in the current implementation of `get()` that
can lead to silent resource leaks (orphaned subprocesses):
1. **Race Condition on `self._live_owners` Check**:
Checking `self._live_owners` outside of `self._lock` (lines 112-113) is
not thread-safe. If another thread purges the last owner after this check but
before the lock is acquired, `get()` will proceed to create a cache entry with
an empty `owners` set. Since the entry has no owners, it will never be cleaned
up by subsequent `purge()` calls, leaking the subprocess.
2. **Resource Leak on Invalid Owner**:
If an invalid/unregistered `owner` is passed, the cache entry is created
and added to `self._cache` *before* the owner validation check is performed
(lines 118-119). When the `RuntimeError` is raised, the newly created cache
entry is left in `self._cache` with an empty `owners` set, permanently leaking
the subprocess.
### Solution:
- Move the `self._live_owners` check inside the lock.
- Validate the `owner` registration *before* creating the cache entry.
```suggestion
def get(self, *key, owner=None):
with self._lock:
if not self._live_owners:
raise RuntimeError("At least one owner must be registered.")
if owner is not None and owner not in self._live_owners:
raise RuntimeError("The requesting owner must be registered.")
if key not in self._cache:
self._cache[key] = _SharedCacheEntry(self._constructor(*key), set())
if owner is not None:
self._cache[key].owners.add(owner)
for live_owner, is_context in self._live_owners.items():
if is_context:
self._cache[key].owners.add(live_owner)
else:
for live_owner in self._live_owners:
self._cache[key].owners.add(live_owner)
return self._cache[key].obj
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]