gemini-code-assist[bot] commented on code in PR #38432:
URL: https://github.com/apache/beam/pull/38432#discussion_r3220843478


##########
sdks/python/apache_beam/runners/worker/worker_pool_main.py:
##########
@@ -166,6 +167,7 @@ def StartWorker(
             worker_id=start_worker_request.worker_id,
             state_cache_size=self._state_cache_size,
             data_buffer_time_limit_ms=self._data_buffer_time_limit_ms)
+        self._worker_threads[start_worker_request.worker_id] = worker

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If `StartWorker` is called with a `worker_id` that is already present in 
`_worker_threads`, the existing `SdkHarness` instance will be overwritten 
without being stopped. This results in a thread leak for the previous worker, 
which is the exact issue this PR aims to solve. Consider checking for the 
existence of the ID and stopping the old worker before proceeding.
   
   ```suggestion
           if start_worker_request.worker_id in self._worker_threads:
             self.StopWorker(
                 beam_fn_api_pb2.StopWorkerRequest(
                     worker_id=start_worker_request.worker_id),
                 None)
           self._worker_threads[start_worker_request.worker_id] = worker
   ```



##########
sdks/python/apache_beam/runners/portability/prism_runner_test.py:
##########
@@ -488,6 +493,59 @@ def test_singleton(self, enable_singleton):
       else:
         mock_prism_server.assert_called_once()
 
+  def test_loopback_worker_daemon_thread_accumulation(self):
+    """Verifies that in LOOPBACK mode, the external worker pool servicer 
properly
+    tracks active thread-based SdkHarness workers and cleanly shuts them down 
in
+    StopWorker via sentinel messages. This prevents background daemon threads 
from
+    accumulating across sequential pipeline executions and leaking resources.
+    """
+    servicer = worker_pool_main.BeamFnExternalWorkerPoolServicer(
+        use_process=False, state_cache_size=0, data_buffer_time_limit_ms=0)
+
+    active_workers = []
+    mock_responses = queue.Queue()
+
+    def mock_run(self_worker):
+      active_workers.append(self_worker)
+      mock_responses.get()
+      active_workers.remove(self_worker)
+
+    with mock.patch(
+        'apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness:
+      mock_harness.return_value._responses = mock_responses
+      mock_harness.return_value.run = lambda: mock_run(mock_harness)
+
+      # Simulate starting Worker 1 for Pipeline 1
+      req1 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_1")
+      req1.control_endpoint.url = "localhost:12345"
+      servicer.StartWorker(req1, None)
+
+      time.sleep(0.05)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using fixed `time.sleep()` calls in tests is a common source of flakiness, 
as the required wait time can vary significantly between local and CI 
environments. It is better to use a polling loop with a timeout or a 
synchronization primitive (like `threading.Event`) to wait for the worker to 
start/stop. This applies to lines 530, 540, and 546 as well.



##########
sdks/python/apache_beam/runners/worker/worker_pool_main.py:
##########
@@ -188,6 +190,15 @@ def StopWorker(
       _LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
       kill_process_gracefully(worker_process)
 
+    # applicable for thread mode to ensure thread cleanup by
+    # unblocking the harness request stream.
+    worker_thread_harness = self._worker_threads.pop(
+        stop_worker_request.worker_id, None)
+    if worker_thread_harness:
+      _LOGGER.info("Stopping thread worker %s" % stop_worker_request.worker_id)
+      from apache_beam.utils.sentinel import Sentinel

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Inline imports are generally discouraged unless necessary to avoid circular 
dependencies. Since `Sentinel` is a utility class, it should be moved to the 
top-level imports of the module for better readability and consistency with PEP 
8.
   
   <details>
   <summary>References</summary>
   
   1. Imports should be at the top of the file, after any module comments and 
docstrings, and before module globals and constants. 
<sup>([link](http://console.cloud.google.com/gemini-code-assist/agents-tools))</sup>
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to