gemini-code-assist[bot] commented on code in PR #38501:
URL: https://github.com/apache/beam/pull/38501#discussion_r3253426530


##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
     self._cache = {}
     self._lock = threading.Lock()
     self._counter = 0
+    self._purge_buffer = []
+    self._owner2_buffer = []
 
   def _next_id(self):
-    with self._lock:
-      self._counter += 1
-      return self._counter
+    # Caller must hold self._lock.
+    self._counter += 1
+    return self._counter
 
   def register(self):
-    owner = self._next_id()
-    self._live_owners.add(owner)
+    with self._lock:
+      owner = self._next_id()
+      self._live_owners.add(owner)
+      if owner == 2:
+        self._owner2_buffer.append({
+            'action': 'register',
+            'pid': os.getpid(),
+            'live_owners': list(self._live_owners)
+        })
     return owner
 
   def purge(self, owner):
     to_delete = []
     with self._lock:
+      self._purge_buffer.append({
+          'owner': owner,
+          'pid': os.getpid(),
+          'live_owners': ("%s" % self._live_owners),
+      })
+      if owner == 2:
+        self._owner2_buffer.append({
+            'action': 'purge',
+            'pid': os.getpid(),
+            'live_owners': list(self._live_owners)
+        })

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   These lines append to the debugging buffers and should be removed to prevent 
memory leaks and unnecessary overhead.



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
     self._cache = {}
     self._lock = threading.Lock()
     self._counter = 0
+    self._purge_buffer = []
+    self._owner2_buffer = []

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `_purge_buffer` and `_owner2_buffer` appear to be debugging tools used 
to diagnose the intermittent issue mentioned in the PR description. Since they 
are never cleared, they will cause a memory leak in long-running processes as 
they grow indefinitely. These should be removed from the production code.



##########
sdks/python/pytest.ini:
##########
@@ -17,6 +17,8 @@
 
 [pytest]
 junit_family = xunit2
+log_cli = true
+log_cli_level = WARNING

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Enabling `log_cli` globally can be very noisy for other developers and in CI 
logs. If this was only intended for debugging the intermittent issue, it should 
be reverted before merging.



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
     self._cache = {}
     self._lock = threading.Lock()
     self._counter = 0
+    self._purge_buffer = []
+    self._owner2_buffer = []
 
   def _next_id(self):
-    with self._lock:
-      self._counter += 1
-      return self._counter
+    # Caller must hold self._lock.
+    self._counter += 1
+    return self._counter
 
   def register(self):
-    owner = self._next_id()
-    self._live_owners.add(owner)
+    with self._lock:
+      owner = self._next_id()
+      self._live_owners.add(owner)
+      if owner == 2:
+        self._owner2_buffer.append({
+            'action': 'register',
+            'pid': os.getpid(),
+            'live_owners': list(self._live_owners)
+        })

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This hardcoded check for `owner == 2` is debugging logic and should be 
removed.



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -76,25 +76,47 @@ def __init__(self, constructor, destructor):
     self._cache = {}
     self._lock = threading.Lock()
     self._counter = 0
+    self._purge_buffer = []
+    self._owner2_buffer = []
 
   def _next_id(self):
-    with self._lock:
-      self._counter += 1
-      return self._counter
+    # Caller must hold self._lock.
+    self._counter += 1
+    return self._counter
 
   def register(self):
-    owner = self._next_id()
-    self._live_owners.add(owner)
+    with self._lock:
+      owner = self._next_id()
+      self._live_owners.add(owner)
+      if owner == 2:
+        self._owner2_buffer.append({
+            'action': 'register',
+            'pid': os.getpid(),
+            'live_owners': list(self._live_owners)
+        })
     return owner
 
   def purge(self, owner):
     to_delete = []
     with self._lock:
+      self._purge_buffer.append({
+          'owner': owner,
+          'pid': os.getpid(),
+          'live_owners': ("%s" % self._live_owners),
+      })
+      if owner == 2:
+        self._owner2_buffer.append({
+            'action': 'purge',
+            'pid': os.getpid(),
+            'live_owners': list(self._live_owners)
+        })
       if owner not in self._live_owners:
         _LOGGER.warning(
             "Subprocess owner %s already purged. If this occurs during atexit "
-            "shutdown, the subprocess was already cleaned up earlier.",
-            owner)
+            "shutdown, the subprocess was already cleaned up earlier. Purge 
buffer: %s. Owner 2 buffer: %s",
+            owner,
+            self._purge_buffer,
+            self._owner2_buffer)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The log message should be reverted to its original form, excluding the 
debugging buffers which are being removed.
   
   ```python
               "shutdown, the subprocess was already cleaned up earlier.",
               owner)
   ```



##########
sdks/python/apache_beam/transforms/external_test.py:
##########
@@ -799,7 +799,13 @@ def test_implicit_builder_with_constructor_method(self):
 
 class JavaJarExpansionServiceTest(unittest.TestCase):
   def setUp(self):
-    SubprocessServer._cache._live_owners = set()
+    SubprocessServer._cache._purge_buffer.append({
+        'owner': 'reset', 'pid': os.getpid(), 'live_owners': []
+    })

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Remove the manual manipulation of the internal `_purge_buffer` as it relies 
on debugging infrastructure that should be removed from the main codebase.



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -103,6 +125,11 @@ def purge(self, owner):
         if not entry.owners:
           to_delete.append(entry.obj)
           del self._cache[key]
+      if to_delete:
+        _LOGGER.warning(
+            "Purging subprocess. Purge buffer: %s. Owner 2 buffer: %s",
+            self._purge_buffer,
+            self._owner2_buffer)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This additional warning appears to be for debugging purposes and should be 
removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to