gemini-code-assist[bot] commented on code in PR #38501:
URL: https://github.com/apache/beam/pull/38501#discussion_r3253426530
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
self._cache = {}
self._lock = threading.Lock()
self._counter = 0
+ self._purge_buffer = []
+ self._owner2_buffer = []
def _next_id(self):
- with self._lock:
- self._counter += 1
- return self._counter
+ # Caller must hold self._lock.
+ self._counter += 1
+ return self._counter
def register(self):
- owner = self._next_id()
- self._live_owners.add(owner)
+ with self._lock:
+ owner = self._next_id()
+ self._live_owners.add(owner)
+ if owner == 2:
+ self._owner2_buffer.append({
+ 'action': 'register',
+ 'pid': os.getpid(),
+ 'live_owners': list(self._live_owners)
+ })
return owner
def purge(self, owner):
to_delete = []
with self._lock:
+ self._purge_buffer.append({
+ 'owner': owner,
+ 'pid': os.getpid(),
+ 'live_owners': ("%s" % self._live_owners),
+ })
+ if owner == 2:
+ self._owner2_buffer.append({
+ 'action': 'purge',
+ 'pid': os.getpid(),
+ 'live_owners': list(self._live_owners)
+ })
Review Comment:

These lines append to the debugging buffers and should be removed to prevent
memory leaks and unnecessary overhead.
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
self._cache = {}
self._lock = threading.Lock()
self._counter = 0
+ self._purge_buffer = []
+ self._owner2_buffer = []
Review Comment:

The `_purge_buffer` and `_owner2_buffer` appear to be debugging tools used
to diagnose the intermittent issue mentioned in the PR description. Since they
are never cleared, they will cause a memory leak in long-running processes as
they grow indefinitely. These should be removed from the production code.
##########
sdks/python/pytest.ini:
##########
@@ -17,6 +17,8 @@
[pytest]
junit_family = xunit2
+log_cli = true
+log_cli_level = WARNING
Review Comment:

Enabling `log_cli` globally can be very noisy for other developers and in CI
logs. If this was only intended for debugging the intermittent issue, it should
be reverted before merging.
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
self._cache = {}
self._lock = threading.Lock()
self._counter = 0
+ self._purge_buffer = []
+ self._owner2_buffer = []
def _next_id(self):
- with self._lock:
- self._counter += 1
- return self._counter
+ # Caller must hold self._lock.
+ self._counter += 1
+ return self._counter
def register(self):
- owner = self._next_id()
- self._live_owners.add(owner)
+ with self._lock:
+ owner = self._next_id()
+ self._live_owners.add(owner)
+ if owner == 2:
+ self._owner2_buffer.append({
+ 'action': 'register',
+ 'pid': os.getpid(),
+ 'live_owners': list(self._live_owners)
+ })
Review Comment:

This hardcoded check for `owner == 2` is debugging logic and should be
removed.
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
self._cache = {}
self._lock = threading.Lock()
self._counter = 0
+ self._purge_buffer = []
+ self._owner2_buffer = []
def _next_id(self):
- with self._lock:
- self._counter += 1
- return self._counter
+ # Caller must hold self._lock.
+ self._counter += 1
+ return self._counter
def register(self):
- owner = self._next_id()
- self._live_owners.add(owner)
+ with self._lock:
+ owner = self._next_id()
+ self._live_owners.add(owner)
+ if owner == 2:
+ self._owner2_buffer.append({
+ 'action': 'register',
+ 'pid': os.getpid(),
+ 'live_owners': list(self._live_owners)
+ })
return owner
def purge(self, owner):
to_delete = []
with self._lock:
+ self._purge_buffer.append({
+ 'owner': owner,
+ 'pid': os.getpid(),
+ 'live_owners': ("%s" % self._live_owners),
+ })
+ if owner == 2:
+ self._owner2_buffer.append({
+ 'action': 'purge',
+ 'pid': os.getpid(),
+ 'live_owners': list(self._live_owners)
+ })
if owner not in self._live_owners:
_LOGGER.warning(
"Subprocess owner %s already purged. If this occurs during atexit "
- "shutdown, the subprocess was already cleaned up earlier.",
- owner)
+ "shutdown, the subprocess was already cleaned up earlier. Purge
buffer: %s. Owner 2 buffer: %s",
+ owner,
+ self._purge_buffer,
+ self._owner2_buffer)
Review Comment:

The log message should be reverted to its original form, excluding the
debugging buffers which are being removed.
```python
"shutdown, the subprocess was already cleaned up earlier.",
owner)
```
##########
sdks/python/apache_beam/transforms/external_test.py:
##########
@@ -799,7 +799,13 @@ def test_implicit_builder_with_constructor_method(self):
class JavaJarExpansionServiceTest(unittest.TestCase):
def setUp(self):
- SubprocessServer._cache._live_owners = set()
+ SubprocessServer._cache._purge_buffer.append({
+ 'owner': 'reset', 'pid': os.getpid(), 'live_owners': []
+ })
Review Comment:

Remove the manual manipulation of the internal `_purge_buffer` as it relies
on debugging infrastructure that should be removed from the main codebase.
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -103,6 +125,11 @@ def purge(self, owner):
if not entry.owners:
to_delete.append(entry.obj)
del self._cache[key]
+ if to_delete:
+ _LOGGER.warning(
+ "Purging subprocess. Purge buffer: %s. Owner 2 buffer: %s",
+ self._purge_buffer,
+ self._owner2_buffer)
Review Comment:

This additional warning appears to be for debugging purposes and should be
removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]