This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 34c6e26112c Make SubprocessServer shared cache purging idempotent 
(#38455)
34c6e26112c is described below

commit 34c6e26112c20f12b2a4941a2eba3bb9b1e19612
Author: Shunping Huang <[email protected]>
AuthorDate: Tue May 12 10:10:32 2026 -0400

    Make SubprocessServer shared cache purging idempotent (#38455)
    
    * Make SubprocessServer shared cache purging idempotent
    
    * Reformat
---
 sdks/python/apache_beam/utils/subprocess_server.py |  6 ++++-
 .../apache_beam/utils/subprocess_server_test.py    | 29 +++++++++++++++++++---
 2 files changed, 30 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index 5752a49dde2..fed5ee591bc 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -91,7 +91,11 @@ class _SharedCache:
     to_delete = []
     with self._lock:
       if owner not in self._live_owners:
-        raise ValueError(f"{owner} not in {self._live_owners}")
+        _LOGGER.warning(
+            "Subprocess owner %s already purged. If this occurs during atexit "
+            "shutdown, the subprocess was already cleaned up earlier.",
+            owner)
+        return
       self._live_owners.remove(owner)
       for key, entry in list(self._cache.items()):
         if owner in entry.owners:
diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py 
b/sdks/python/apache_beam/utils/subprocess_server_test.py
index 0f25d9904f0..efd357c4c13 100644
--- a/sdks/python/apache_beam/utils/subprocess_server_test.py
+++ b/sdks/python/apache_beam/utils/subprocess_server_test.py
@@ -421,10 +421,31 @@ class CacheTest(unittest.TestCase):
     t1.join()
     t2.join()
 
-    # Exactly one thread should raise the expected ValueError because they are 
cleanly serialized
-    self.assertEqual(len(exceptions), 1)
-    self.assertIsInstance(exceptions[0], ValueError)
-    self.assertNotIsInstance(exceptions[0], KeyError)
+    # Both threads should succeed cleanly without raising an exception under 
idempotent purging.
+    self.assertEqual(len(exceptions), 0)
+
+  def test_stop_process_after_cache_purged(self):
+    # Reproduce the ValueError when stop_process() (called by atexit)
+    # runs after the cache/owner was already purged during test teardown.
+    cache = subprocess_server._SharedCache(
+        lambda *args: "dummy_process", lambda obj: None)
+
+    class DummySubprocessServer(subprocess_server.SubprocessServer):
+      _cache = cache
+
+      def __init__(self):
+        super().__init__(lambda channel: None, ["dummy_cmd"], port=12345)
+
+    server = DummySubprocessServer()
+    server.start_process()
+    owner_id = server._owner_id
+
+    # Simulate pipeline context exit or test teardown purging the cache 
directly
+    cache.purge(owner_id)
+
+    # Calling stop_process() (which happens during atexit) should succeed 
cleanly
+    # without raising ValueError.
+    server.stop_process()
 
 
 if __name__ == '__main__':

Reply via email to