sergiitk commented on code in PR #36528:
URL: https://github.com/apache/beam/pull/36528#discussion_r2501689245
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
try:
process, endpoint = self.start_process()
wait_secs = .1
- channel_options = [("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1)]
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+ ("grpc.keepalive_time_ms", 180000),
+ # Default: 5000ms (5s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600000),
Review Comment:
As discussed in another thread, this should be ok for your usage.
Note that without setting `grpc.keepalive_time_ms`, the server will send a
keepalive ping every 2 hours.
So in the current setup, the server sends a ping every two hours, then waits
for 10 minutes for client to return the ping
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
try:
process, endpoint = self.start_process()
wait_secs = .1
- channel_options = [("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1)]
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+ ("grpc.keepalive_time_ms", 180000),
+ # Default: 5000ms (5s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no calls
+ ("grpc.keepalive_permit_without_calls", True),
+ # Default: 300000ms (5min), increased to 10min for stability
+ ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+ # Default: 300000ms (5min), increased to 120s for conservative pings
+ ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),
+ # Default: 2, set to 0 to allow unlimited ping strikes
+ ("grpc.http2.max_ping_strikes", 0),
+ # Default: 0 (disabled), enable socket reuse for better handling
+ ("grpc.so_reuseport", 1),
Review Comment:
Great! With this option, you don't need to close the socket for the found
port anymore, as you'll be able to bind and serve on it:
https://github.com/apache/beam/blob/eba04b2a56759e6095b3cc9080c9941302543c57/sdks/python/apache_beam/utils/subprocess_server.py#L610-L612
You'll need to bind the initial socket with `SO_REUSEPORT`, ideally with
`SO_REUSEADDR` as well.
https://github.com/apache/beam/blob/eba04b2a56759e6095b3cc9080c9941302543c57/sdks/python/apache_beam/utils/subprocess_server.py#L595
This approach addresses the race condition where an unused port was found by
one process, closes the socket, but before this process starts listening on the
found port, it's acquired by another process, resulting in `EADDRINUSE`.
By not closing the socket until the server stops listening, you'll prevent
other processes from seeing that port as unused.
Note that this only applies to systems where `SO_REUSEPORT` is supported.
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,20 @@ def start(self):
try:
process, endpoint = self.start_process()
wait_secs = .1
- channel_options = [("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1)]
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 5000ms (5s), increased to 10 minutes for stability
Review Comment:
The default is 20000ms (20 seconds)
##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,12 @@
class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
DEFAULT_OPTIONS = [
- ("grpc.keepalive_time_ms", 20000),
- ("grpc.keepalive_timeout_ms", 300000),
+ # Default: 20000ms (20s), increased to 10 minutes for stability
Review Comment:
It's disabled by default, but unless you enable it, all other settings you
added (`grpc.http2.max_pings_without_data`,
`grpc.keepalive_permit_without_calls`) will have no effect.
##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,12 @@
class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
DEFAULT_OPTIONS = [
- ("grpc.keepalive_time_ms", 20000),
+ # keep 5 minutes for now.
("grpc.keepalive_timeout_ms", 300000),
Review Comment:
Again, you want to keep `grpc.keepalive_time_ms`, but carefully consider the
value for `grpc.keepalive_timeout_ms`
##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
DEFAULT_OPTIONS = [
- ("grpc.keepalive_time_ms", 20000),
- ("grpc.keepalive_timeout_ms", 300000),
+ # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+ ("grpc.keepalive_time_ms", 180000),
+ # Default: 5000ms (5s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600000),
Review Comment:
@tvalentyn
> per [comment
above](https://github.com/apache/beam/pull/36528/files#r2482014530),
keepalive_time_ms is INTMAX. Is that correct, @sergiitk ? Then this concern
doesn't apply?
`grpc.keepalive_time_ms = INTMAX` disables keep alive pings. Meaning
`keepalive_timeout_ms` won't apply too, nor any of the keepalive settings.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]