gemini-code-assist[bot] commented on code in PR #39159:
URL: https://github.com/apache/beam/pull/39159#discussion_r3494373412
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -201,45 +201,53 @@ def __exit__(self, *unused_args):
self.stop()
def start(self):
- try:
- process, endpoint = self.start_process()
- wait_secs = .1
- channel_options = [
- ("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1),
- # Default: 20000ms (20s), increased to 10 minutes for stability
- ("grpc.keepalive_timeout_ms", 600_000),
- # Default: 2, set to 0 to allow unlimited pings without data
- ("grpc.http2.max_pings_without_data", 0),
- # Default: False, set to True to allow keepalive pings when no calls
- ("grpc.keepalive_permit_without_calls", True),
- # Default: 2, set to 0 to allow unlimited ping strikes
- ("grpc.http2.max_ping_strikes", 0),
- # Default: 0 (disabled), enable socket reuse for better handling
- ("grpc.so_reuseport", 1),
- ]
- self._grpc_channel = grpc.insecure_channel(
- endpoint, options=channel_options)
- channel_ready = grpc.channel_ready_future(self._grpc_channel)
- while True:
- if process is not None and process.poll() is not None:
- _LOGGER.error("Failed to start job service with %s", process.args)
- raise RuntimeError(
- 'Service failed to start up with error %s' % process.poll())
- try:
- channel_ready.result(timeout=wait_secs)
- break
- except (grpc.FutureTimeoutError, grpc.RpcError):
- wait_secs *= 1.2
- logging.log(
- logging.WARNING if wait_secs > 1 else logging.DEBUG,
- 'Waiting for grpc channel to be ready at %s.',
- endpoint)
- return self._stub_class(self._grpc_channel)
- except: # pylint: disable=bare-except
- _LOGGER.exception("Error bringing up service")
- self.stop_force()
- raise
+ max_retries = 3
+ for attempt in range(max_retries):
+ try:
+ process, endpoint = self.start_process()
+ wait_secs = .1
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 20000ms (20s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600_000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no
calls
+ ("grpc.keepalive_permit_without_calls", True),
+ # Default: 2, set to 0 to allow unlimited ping strikes
+ ("grpc.http2.max_ping_strikes", 0),
+ # Default: 0 (disabled), enable socket reuse for better handling
+ ("grpc.so_reuseport", 1),
+ ]
+ self._grpc_channel = grpc.insecure_channel(
+ endpoint, options=channel_options)
+ channel_ready = grpc.channel_ready_future(self._grpc_channel)
+ while True:
+ if process is not None and process.poll() is not None:
+ _LOGGER.error("Failed to start job service with %s", process.args)
+ raise RuntimeError(
+ 'Service failed to start up with error %s' % process.poll())
+ try:
+ channel_ready.result(timeout=wait_secs)
+ break
+ except (grpc.FutureTimeoutError, grpc.RpcError):
+ wait_secs *= 1.2
+ logging.log(
+ logging.WARNING if wait_secs > 1 else logging.DEBUG,
+ 'Waiting for grpc channel to be ready at %s.',
+ endpoint)
+ return self._stub_class(self._grpc_channel)
+ except Exception as e:
+ _LOGGER.warning(
+ "Error bringing up service (attempt %d of %d): %s",
+ attempt + 1,
+ max_retries,
+ e)
+ self.stop_force()
+ if attempt == max_retries - 1:
+ raise
+ time.sleep(1)
Review Comment:

The original implementation used a bare `except:` block (which catches
`BaseException`, including `KeyboardInterrupt` and `SystemExit`) to ensure that
`self.stop_force()` is always called to clean up the subprocess on any failure
or interruption.
By changing this to `except Exception as e:`, interruptions like
`KeyboardInterrupt` will bypass this block, causing the started subprocess to
leak.
To prevent resource leaks on interruption, add a bare `except:` block to
handle `BaseException` and perform the necessary cleanup before re-raising.
```suggestion
except Exception as e:
_LOGGER.warning(
"Error bringing up service (attempt %d of %d): %s",
attempt + 1,
max_retries,
e)
self.stop_force()
if attempt == max_retries - 1:
raise
time.sleep(1)
except: # pylint: disable=bare-except
self.stop_force()
raise
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]