This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 930b94cceb6 Fix test hang in subprocess expansion service on port bind 
failure (#38572)
930b94cceb6 is described below

commit 930b94cceb69e33bd9a9a2f1287ebe5c75533536
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 21 09:11:01 2026 -0400

    Fix test hang in subprocess expansion service on port bind failure (#38572)
    
    * Fix silent test hang in subprocess expansion service on port bind failure
    
    * Formatting
    
    * Add retry when starting subprocess server.
    
    * Add sleep before retrying.
---
 .../runners/portability/expansion_service_main.py  | 14 +++-
 sdks/python/apache_beam/utils/subprocess_server.py | 86 ++++++++++++----------
 2 files changed, 58 insertions(+), 42 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/portability/expansion_service_main.py 
b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index 269d02b3efb..f2d03e0e898 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -55,7 +55,9 @@ def main(argv):
   with 
fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
       known_args.fully_qualified_name_glob):
 
-    address = '0.0.0.0:{}'.format(known_args.port)
+    # Bind to localhost instead of 0.0.0.0 to ensure compatibility with 
loopback
+    # connections on dual-stack (IPv4/IPv6) systems.
+    address = 'localhost:{}'.format(known_args.port)
     server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     if known_args.serve_loopback_worker:
       beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
@@ -71,9 +73,15 @@ def main(argv):
         artifact_service.ArtifactRetrievalService(
             artifact_service.BeamFilesystemHandler(None).file_reader),
         server)
-    server.add_insecure_port(address)
+    # Ensure gRPC server successfully binds. If this fails (e.g., due to port 
collision),
+    # add_insecure_port returns 0. We raise an error to crash the subprocess 
immediately,
+    # allowing the parent process to detect it and fail fast rather than 
hanging.
+    bound_port = server.add_insecure_port(address)
+    if not bound_port:
+      raise RuntimeError(
+          "Failed to bind expansion service to {}".format(address))
     server.start()
-    _LOGGER.info('Listening for expansion requests at %d', known_args.port)
+    _LOGGER.info('Listening for expansion requests at %d', bound_port)
 
     def cleanup(unused_signum, unused_frame):
       _LOGGER.info('Shutting down expansion service.')
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index d21cb486b8f..b22e6badb5e 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -186,45 +186,53 @@ class SubprocessServer(object):
     self.stop()
 
   def start(self):
-    try:
-      process, endpoint = self.start_process()
-      wait_secs = .1
-      channel_options = [
-          ("grpc.max_receive_message_length", -1),
-          ("grpc.max_send_message_length", -1),
-          # Default: 20000ms (20s), increased to 10 minutes for stability
-          ("grpc.keepalive_timeout_ms", 600_000),
-          # Default: 2, set to 0 to allow unlimited pings without data
-          ("grpc.http2.max_pings_without_data", 0),
-          # Default: False, set to True to allow keepalive pings when no calls
-          ("grpc.keepalive_permit_without_calls", True),
-          # Default: 2, set to 0 to allow unlimited ping strikes
-          ("grpc.http2.max_ping_strikes", 0),
-          # Default: 0 (disabled), enable socket reuse for better handling
-          ("grpc.so_reuseport", 1),
-      ]
-      self._grpc_channel = grpc.insecure_channel(
-          endpoint, options=channel_options)
-      channel_ready = grpc.channel_ready_future(self._grpc_channel)
-      while True:
-        if process is not None and process.poll() is not None:
-          _LOGGER.error("Started job service with %s", process.args)
-          raise RuntimeError(
-              'Service failed to start up with error %s' % process.poll())
-        try:
-          channel_ready.result(timeout=wait_secs)
-          break
-        except (grpc.FutureTimeoutError, grpc.RpcError):
-          wait_secs *= 1.2
-          logging.log(
-              logging.WARNING if wait_secs > 1 else logging.DEBUG,
-              'Waiting for grpc channel to be ready at %s.',
-              endpoint)
-      return self._stub_class(self._grpc_channel)
-    except:  # pylint: disable=bare-except
-      _LOGGER.exception("Error bringing up service")
-      self.stop()
-      raise
+    max_attempts = 3
+    for attempt in range(max_attempts):
+      try:
+        process, endpoint = self.start_process()
+        wait_secs = .1
+        channel_options = [
+            ("grpc.max_receive_message_length", -1),
+            ("grpc.max_send_message_length", -1),
+            # Default: 20000ms (20s), increased to 10 minutes for stability
+            ("grpc.keepalive_timeout_ms", 600_000),
+            # Default: 2, set to 0 to allow unlimited pings without data
+            ("grpc.http2.max_pings_without_data", 0),
+            # Default: False, set to True to allow keepalive pings when no 
calls
+            ("grpc.keepalive_permit_without_calls", True),
+            # Default: 2, set to 0 to allow unlimited ping strikes
+            ("grpc.http2.max_ping_strikes", 0),
+            # Default: 0 (disabled), enable socket reuse for better handling
+            ("grpc.so_reuseport", 1),
+        ]
+        self._grpc_channel = grpc.insecure_channel(
+            endpoint, options=channel_options)
+        channel_ready = grpc.channel_ready_future(self._grpc_channel)
+        while True:
+          if process is not None and process.poll() is not None:
+            _LOGGER.error("Started job service with %s", process.args)
+            raise RuntimeError(
+                'Service failed to start up with error %s' % process.poll())
+          try:
+            channel_ready.result(timeout=wait_secs)
+            break
+          except (grpc.FutureTimeoutError, grpc.RpcError):
+            wait_secs *= 1.2
+            logging.log(
+                logging.WARNING if wait_secs > 1 else logging.DEBUG,
+                'Waiting for grpc channel to be ready at %s.',
+                endpoint)
+        return self._stub_class(self._grpc_channel)
+      except Exception as e:
+        _LOGGER.warning(
+            "Error bringing up service on attempt %d: %s",
+            attempt + 1,
+            e,
+            exc_info=True)
+        self.stop()
+        if attempt == max_attempts - 1:
+          raise
+        time.sleep(1)
 
   def start_process(self):
     if self._owner_id is not None:

Reply via email to