This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a8e7ffab371 Fix race conditions, error recovery, and exit handlers in 
job servers (#38423)
a8e7ffab371 is described below

commit a8e7ffab3716459518aeea2e81fe7181c6885179
Author: Shunping Huang <[email protected]>
AuthorDate: Fri May 8 22:21:39 2026 -0400

    Fix race conditions, error recovery, and exit handlers in job servers 
(#38423)
---
 .../apache_beam/runners/portability/job_server.py  |   9 +-
 sdks/python/apache_beam/utils/subprocess_server.py |  16 +--
 .../apache_beam/utils/subprocess_server_test.py    | 124 +++++++++++++++++++++
 3 files changed, 140 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/job_server.py 
b/sdks/python/apache_beam/runners/portability/job_server.py
index 9fdaabd1a17..53688e0be95 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -94,8 +94,13 @@ class StopOnExitJobServer(JobServer):
   def stop(self):
     with self._lock:
       if self._started:
-        self._job_server.stop()
-        self._started = False
+        try:
+          self._job_server.stop()
+        finally:
+          self._started = False
+          # Unregister the atexit handler to prevent duplicate
+          # registrations when the server is restarted/reused.
+          atexit.unregister(self.stop)
 
 
 class SubprocessJobServer(JobServer):
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index 988bd680b92..5752a49dde2 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -88,11 +88,11 @@ class _SharedCache:
     return owner
 
   def purge(self, owner):
-    if owner not in self._live_owners:
-      raise ValueError(f"{owner} not in {self._live_owners}")
-    self._live_owners.remove(owner)
     to_delete = []
     with self._lock:
+      if owner not in self._live_owners:
+        raise ValueError(f"{owner} not in {self._live_owners}")
+      self._live_owners.remove(owner)
       for key, entry in list(self._cache.items()):
         if owner in entry.owners:
           entry.owners.remove(owner)
@@ -255,15 +255,17 @@ class SubprocessServer(object):
 
   def stop_process(self):
     if self._owner_id is not None:
-      self._cache.purge(self._owner_id)
-      self._owner_id = None
+      try:
+        self._cache.purge(self._owner_id)
+      finally:
+        # Make sure _owner_id is set to None even if purge fails.
+        self._owner_id = None
     if self._grpc_channel:
       try:
         self._grpc_channel.close()
       except:  # pylint: disable=bare-except
         _LOGGER.error(
-            "Could not close the gRPC channel started for the "
-            "expansion service")
+            "Could not close the gRPC channel started with cmd %s", self._cmd)
       finally:
         self._grpc_channel = None
 
diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py 
b/sdks/python/apache_beam/utils/subprocess_server_test.py
index c848595db35..0f25d9904f0 100644
--- a/sdks/python/apache_beam/utils/subprocess_server_test.py
+++ b/sdks/python/apache_beam/utils/subprocess_server_test.py
@@ -19,6 +19,7 @@
 
 # pytype: skip-file
 
+import atexit
 import glob
 import os
 import random
@@ -29,7 +30,9 @@ import subprocess
 import tempfile
 import threading
 import unittest
+from unittest.mock import patch
 
+from apache_beam.runners.portability import job_server
 from apache_beam.utils import subprocess_server
 
 
@@ -302,6 +305,127 @@ class CacheTest(unittest.TestCase):
     self.assertNotEqual(cache.get('b'), b)
     cache.purge(owner3)
 
+  def test_destructor_exception_partial_state(self):
+    # In SubprocessServer.stop_process(), we need to make sure self._owner_id 
is always
+    # set to None if it is not already set, even if a destructor exception 
happens
+    # during purge(owner_id).
+
+    destructor_calls = []
+
+    def faulty_destructor(obj):
+      destructor_calls.append(obj)
+      raise RuntimeError("Destructor failed")
+
+    custom_cache = subprocess_server._SharedCache(
+        lambda *args: "process_obj", faulty_destructor)
+
+    class CustomServer(subprocess_server.SubprocessServer):
+      _cache = custom_cache
+
+      def __init__(self):
+        super().__init__(lambda channel: None, ["dummy_cmd"], port=12345)
+
+    server = CustomServer()
+    server.start_process()
+    owner_id = server._owner_id
+    self.assertIsNotNone(owner_id)
+    self.assertIn(owner_id, custom_cache._live_owners)
+
+    # First stop attempt fails in the destructor
+    with self.assertRaises(RuntimeError):
+      server.stop_process()
+
+    # Verify fixed state: owner is purged from cache set, AND self._owner_id 
is successfully cleared to None
+    self.assertNotIn(owner_id, custom_cache._live_owners)
+    self.assertIsNone(server._owner_id)
+
+    # Second stop attempt safely does nothing (no ValueError raised)
+    try:
+      server.stop_process()
+    except ValueError:
+      self.fail("ValueError should not be raised here.")
+
+  def test_duplicate_atexit_registration_on_restart(self):
+    # Make sure we don't have duplicate atexit registration when reusing a
+    # StopOnExistJobServer instance.
+
+    class DummyJobServer(job_server.JobServer):
+      def start(self):
+        return "localhost:8080"
+
+      def stop(self):
+        pass
+
+    wrapper = job_server.StopOnExitJobServer(DummyJobServer())
+
+    registered_callbacks = []
+
+    def mock_register(cb):
+      registered_callbacks.append(cb)
+
+    def mock_unregister(cb):
+      if cb in registered_callbacks:
+        registered_callbacks.remove(cb)
+
+    with patch('atexit.register', side_effect=mock_register), \
+         patch('atexit.unregister', side_effect=mock_unregister, create=True):
+      # First start registers stop callback
+      wrapper.start()
+      self.assertTrue(wrapper._started)
+      self.assertEqual(len(registered_callbacks), 1)
+
+      # Explicit stop clears _started AND unregisters the callback
+      wrapper.stop()
+      self.assertFalse(wrapper._started)
+      self.assertEqual(len(registered_callbacks), 0)
+
+      # Re-starting registers the callback again, leaving exactly 1 active 
callback
+      wrapper.start()
+      self.assertTrue(wrapper._started)
+      self.assertEqual(len(registered_callbacks), 1)
+
+  def test_concurrent_purge_race_condition(self):
+    # Concurrent threads attempting to check memebership and call purge for 
the same owner.
+    # Here we explicitly define a synchronized set to mimic the behavior of 
_live_owners.
+    # This set will block two threads on __contains__, allowing us to test the 
race condition.
+    cache = subprocess_server._SharedCache(lambda x: "obj", lambda x: None)
+    owner = cache.register()
+
+    barrier = threading.Barrier(2)
+    exceptions = []
+
+    class SynchronizedSet(set):
+      def __contains__(self, item):
+        res = super().__contains__(item)
+        try:
+          # Force both threads to align right after checking membership but 
before removal
+          barrier.wait(timeout=0.2)
+        except threading.BrokenBarrierError:
+          pass
+        return res
+
+    cache._live_owners = SynchronizedSet(cache._live_owners)
+
+    def purge_worker():
+      try:
+        cache.purge(owner)
+      except Exception as e:
+        exceptions.append(e)
+
+    t1 = threading.Thread(target=purge_worker)
+    t2 = threading.Thread(target=purge_worker)
+
+    t1.start()
+    t2.start()
+
+    t1.join()
+    t2.join()
+
+    # Exactly one thread should raise the expected ValueError because they are 
cleanly serialized
+    self.assertEqual(len(exceptions), 1)
+    self.assertIsInstance(exceptions[0], ValueError)
+    self.assertNotIsInstance(exceptions[0], KeyError)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to