shunping commented on code in PR #38432:
URL: https://github.com/apache/beam/pull/38432#discussion_r3221125068
##########
sdks/python/apache_beam/runners/portability/prism_runner_test.py:
##########
@@ -488,6 +493,59 @@ def test_singleton(self, enable_singleton):
else:
mock_prism_server.assert_called_once()
+ def test_loopback_worker_daemon_thread_accumulation(self):
+ """Verifies that in LOOPBACK mode, the external worker pool servicer
properly
+ tracks active thread-based SdkHarness workers and cleanly shuts them down
in
+ StopWorker via sentinel messages. This prevents background daemon threads
from
+ accumulating across sequential pipeline executions and leaking resources.
+ """
+ servicer = worker_pool_main.BeamFnExternalWorkerPoolServicer(
+ use_process=False, state_cache_size=0, data_buffer_time_limit_ms=0)
+
+ active_workers = []
+ mock_responses = queue.Queue()
+
+ def mock_run(self_worker):
+ active_workers.append(self_worker)
+ mock_responses.get()
+ active_workers.remove(self_worker)
+
+ with mock.patch(
+ 'apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness:
+ mock_harness.return_value._responses = mock_responses
+ mock_harness.return_value.run = lambda: mock_run(mock_harness)
+
+ # Simulate starting Worker 1 for Pipeline 1
+ req1 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_1")
+ req1.control_endpoint.url = "localhost:12345"
+ servicer.StartWorker(req1, None)
+
+ time.sleep(0.05)
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]