shunping commented on code in PR #38572:
URL: https://github.com/apache/beam/pull/38572#discussion_r3278356173


##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -186,45 +186,52 @@ def __exit__(self, *unused_args):
     self.stop()
 
   def start(self):
-    try:
-      process, endpoint = self.start_process()
-      wait_secs = .1
-      channel_options = [
-          ("grpc.max_receive_message_length", -1),
-          ("grpc.max_send_message_length", -1),
-          # Default: 20000ms (20s), increased to 10 minutes for stability
-          ("grpc.keepalive_timeout_ms", 600_000),
-          # Default: 2, set to 0 to allow unlimited pings without data
-          ("grpc.http2.max_pings_without_data", 0),
-          # Default: False, set to True to allow keepalive pings when no calls
-          ("grpc.keepalive_permit_without_calls", True),
-          # Default: 2, set to 0 to allow unlimited ping strikes
-          ("grpc.http2.max_ping_strikes", 0),
-          # Default: 0 (disabled), enable socket reuse for better handling
-          ("grpc.so_reuseport", 1),
-      ]
-      self._grpc_channel = grpc.insecure_channel(
-          endpoint, options=channel_options)
-      channel_ready = grpc.channel_ready_future(self._grpc_channel)
-      while True:
-        if process is not None and process.poll() is not None:
-          _LOGGER.error("Started job service with %s", process.args)
-          raise RuntimeError(
-              'Service failed to start up with error %s' % process.poll())
-        try:
-          channel_ready.result(timeout=wait_secs)
-          break
-        except (grpc.FutureTimeoutError, grpc.RpcError):
-          wait_secs *= 1.2
-          logging.log(
-              logging.WARNING if wait_secs > 1 else logging.DEBUG,
-              'Waiting for grpc channel to be ready at %s.',
-              endpoint)
-      return self._stub_class(self._grpc_channel)
-    except:  # pylint: disable=bare-except
-      _LOGGER.exception("Error bringing up service")
-      self.stop()
-      raise
+    max_attempts = 3
+    for attempt in range(max_attempts):
+      try:
+        process, endpoint = self.start_process()
+        wait_secs = .1
+        channel_options = [
+            ("grpc.max_receive_message_length", -1),
+            ("grpc.max_send_message_length", -1),
+            # Default: 20000ms (20s), increased to 10 minutes for stability
+            ("grpc.keepalive_timeout_ms", 600_000),
+            # Default: 2, set to 0 to allow unlimited pings without data
+            ("grpc.http2.max_pings_without_data", 0),
+            # Default: False, set to True to allow keepalive pings when no 
calls
+            ("grpc.keepalive_permit_without_calls", True),
+            # Default: 2, set to 0 to allow unlimited ping strikes
+            ("grpc.http2.max_ping_strikes", 0),
+            # Default: 0 (disabled), enable socket reuse for better handling
+            ("grpc.so_reuseport", 1),
+        ]
+        self._grpc_channel = grpc.insecure_channel(
+            endpoint, options=channel_options)
+        channel_ready = grpc.channel_ready_future(self._grpc_channel)
+        while True:
+          if process is not None and process.poll() is not None:
+            _LOGGER.error("Started job service with %s", process.args)
+            raise RuntimeError(
+                'Service failed to start up with error %s' % process.poll())
+          try:
+            channel_ready.result(timeout=wait_secs)
+            break
+          except (grpc.FutureTimeoutError, grpc.RpcError):
+            wait_secs *= 1.2
+            logging.log(
+                logging.WARNING if wait_secs > 1 else logging.DEBUG,
+                'Waiting for grpc channel to be ready at %s.',
+                endpoint)
+        return self._stub_class(self._grpc_channel)
+      except Exception as e:
+        _LOGGER.warning(
+            "Error bringing up service on attempt %d: %s",
+            attempt + 1,
+            e,
+            exc_info=True)
+        self.stop()
+        if attempt == max_attempts - 1:
+          raise

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to