gemini-code-assist[bot] commented on code in PR #38432:
URL: https://github.com/apache/beam/pull/38432#discussion_r3220843478
##########
sdks/python/apache_beam/runners/worker/worker_pool_main.py:
##########
@@ -166,6 +167,7 @@ def StartWorker(
worker_id=start_worker_request.worker_id,
state_cache_size=self._state_cache_size,
data_buffer_time_limit_ms=self._data_buffer_time_limit_ms)
+ self._worker_threads[start_worker_request.worker_id] = worker
Review Comment:

If `StartWorker` is called with a `worker_id` that is already present in
`_worker_threads`, the existing `SdkHarness` instance will be overwritten
without being stopped. This results in a thread leak for the previous worker,
which is the exact issue this PR aims to solve. Consider checking for the
existence of the ID and stopping the old worker before proceeding.
```suggestion
if start_worker_request.worker_id in self._worker_threads:
self.StopWorker(
beam_fn_api_pb2.StopWorkerRequest(
worker_id=start_worker_request.worker_id),
None)
self._worker_threads[start_worker_request.worker_id] = worker
```
##########
sdks/python/apache_beam/runners/portability/prism_runner_test.py:
##########
@@ -488,6 +493,59 @@ def test_singleton(self, enable_singleton):
else:
mock_prism_server.assert_called_once()
+ def test_loopback_worker_daemon_thread_accumulation(self):
+ """Verifies that in LOOPBACK mode, the external worker pool servicer
properly
+ tracks active thread-based SdkHarness workers and cleanly shuts them down
in
+ StopWorker via sentinel messages. This prevents background daemon threads
from
+ accumulating across sequential pipeline executions and leaking resources.
+ """
+ servicer = worker_pool_main.BeamFnExternalWorkerPoolServicer(
+ use_process=False, state_cache_size=0, data_buffer_time_limit_ms=0)
+
+ active_workers = []
+ mock_responses = queue.Queue()
+
+ def mock_run(self_worker):
+ active_workers.append(self_worker)
+ mock_responses.get()
+ active_workers.remove(self_worker)
+
+ with mock.patch(
+ 'apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness:
+ mock_harness.return_value._responses = mock_responses
+ mock_harness.return_value.run = lambda: mock_run(mock_harness)
+
+ # Simulate starting Worker 1 for Pipeline 1
+ req1 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_1")
+ req1.control_endpoint.url = "localhost:12345"
+ servicer.StartWorker(req1, None)
+
+ time.sleep(0.05)
Review Comment:

Using fixed `time.sleep()` calls in tests is a common source of flakiness,
as the required wait time can vary significantly between local and CI
environments. It is better to use a polling loop with a timeout or a
synchronization primitive (like `threading.Event`) to wait for the worker to
start/stop. This applies to lines 530, 540, and 546 as well.
##########
sdks/python/apache_beam/runners/worker/worker_pool_main.py:
##########
@@ -188,6 +190,15 @@ def StopWorker(
_LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
kill_process_gracefully(worker_process)
+ # applicable for thread mode to ensure thread cleanup by
+ # unblocking the harness request stream.
+ worker_thread_harness = self._worker_threads.pop(
+ stop_worker_request.worker_id, None)
+ if worker_thread_harness:
+ _LOGGER.info("Stopping thread worker %s" % stop_worker_request.worker_id)
+ from apache_beam.utils.sentinel import Sentinel
Review Comment:

Inline imports are generally discouraged unless necessary to avoid circular
dependencies. Since `Sentinel` is a utility class, it should be moved to the
top-level imports of the module for better readability and consistency with PEP
8.
<details>
<summary>References</summary>
1. Imports should be at the top of the file, after any module comments and
docstrings, and before module globals and constants.
<sup>([link](http://console.cloud.google.com/gemini-code-assist/agents-tools))</sup>
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]