This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 34c6e26112c Make SubprocessServer shared cache purging idempotent
(#38455)
34c6e26112c is described below
commit 34c6e26112c20f12b2a4941a2eba3bb9b1e19612
Author: Shunping Huang <[email protected]>
AuthorDate: Tue May 12 10:10:32 2026 -0400
Make SubprocessServer shared cache purging idempotent (#38455)
* Make SubprocessServer shared cache purging idempotent
* Reformat
---
sdks/python/apache_beam/utils/subprocess_server.py | 6 ++++-
.../apache_beam/utils/subprocess_server_test.py | 29 +++++++++++++++++++---
2 files changed, 30 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py
b/sdks/python/apache_beam/utils/subprocess_server.py
index 5752a49dde2..fed5ee591bc 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -91,7 +91,11 @@ class _SharedCache:
to_delete = []
with self._lock:
if owner not in self._live_owners:
- raise ValueError(f"{owner} not in {self._live_owners}")
+ _LOGGER.warning(
+ "Subprocess owner %s already purged. If this occurs during atexit "
+ "shutdown, the subprocess was already cleaned up earlier.",
+ owner)
+ return
self._live_owners.remove(owner)
for key, entry in list(self._cache.items()):
if owner in entry.owners:
diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py
b/sdks/python/apache_beam/utils/subprocess_server_test.py
index 0f25d9904f0..efd357c4c13 100644
--- a/sdks/python/apache_beam/utils/subprocess_server_test.py
+++ b/sdks/python/apache_beam/utils/subprocess_server_test.py
@@ -421,10 +421,31 @@ class CacheTest(unittest.TestCase):
t1.join()
t2.join()
- # Exactly one thread should raise the expected ValueError because they are
cleanly serialized
- self.assertEqual(len(exceptions), 1)
- self.assertIsInstance(exceptions[0], ValueError)
- self.assertNotIsInstance(exceptions[0], KeyError)
+ # Both threads should succeed cleanly without raising an exception under
idempotent purging.
+ self.assertEqual(len(exceptions), 0)
+
+ def test_stop_process_after_cache_purged(self):
+ # Reproduce the ValueError when stop_process() (called by atexit)
+ # runs after the cache/owner was already purged during test teardown.
+ cache = subprocess_server._SharedCache(
+ lambda *args: "dummy_process", lambda obj: None)
+
+ class DummySubprocessServer(subprocess_server.SubprocessServer):
+ _cache = cache
+
+ def __init__(self):
+ super().__init__(lambda channel: None, ["dummy_cmd"], port=12345)
+
+ server = DummySubprocessServer()
+ server.start_process()
+ owner_id = server._owner_id
+
+ # Simulate pipeline context exit or test teardown purging the cache
directly
+ cache.purge(owner_id)
+
+ # Calling stop_process() (which happens during atexit) should succeed
cleanly
+ # without raising ValueError.
+ server.stop_process()
if __name__ == '__main__':