sergiitk commented on code in PR #36528:
URL: https://github.com/apache/beam/pull/36528#discussion_r2479611499


##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+          ("grpc.keepalive_time_ms", 180000),
+          # Default: 5000ms (5s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600000),

Review Comment:
   Same logic as in 
https://github.com/apache/beam/pull/36528#discussion_r2479659838 applies here.



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+          ("grpc.keepalive_time_ms", 180000),
+          # Default: 5000ms (5s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 300000ms (5min), increased to 10min for stability
+          ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),

Review Comment:
   The name should be `grpc.http2.min_ping_interval_without_data_ms`: 
https://github.com/grpc/grpc/blob/caaa581253b963b2ef3fb5b409254af1a657518a/include/grpc/impl/channel_arg_names.h#L112-L116
   
   ```c
   /** Minimum allowed time between a server receiving successive ping frames
      without sending any data/header frame. Int valued, milliseconds
    */
   #define GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS \
     "grpc.http2.min_ping_interval_without_data_ms"
   ```
   
   However, I don't think you even need that. See
   
   > `RPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS`
   > If there are no data/header frames being sent on the transport, this 
channel argument on the server side controls the minimum time (in milliseconds) 
that gRPC Core would expect between receiving successive pings. If the time 
between successive pings is less that than this time, then the ping will be 
considered a bad ping from the peer. Such a ping counts as a ‘ping strike’. On 
the client side, this does not have any effect.
   > — https://github.com/grpc/grpc/blob/master/doc/keepalive.md
   
   Later you allow unlimited ping strikes.



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability
+      ("grpc.keepalive_timeout_ms", 600000),
+      # Default: 2, set to 0 to allow unlimited pings without data
+      ("grpc.http2.max_pings_without_data", 0),
+      # Default: False, set to True to allow keepalive pings when no calls
+      ("grpc.keepalive_permit_without_calls", True),
+      # Default: 300000ms (5min), increased to 10min for stability
+      ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+      # Default: 300000ms (5min), increased to 120s for conservative pings
+      ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),

Review Comment:
   This option existed with a different name, but has been deprecated and now 
has no effect:
   
   ```c
   /** (DEPRECATED) Does not have any effect.
       Earlier, this arg configured the minimum time between successive ping 
frames
       without receiving any data/header frame, Int valued, milliseconds. This 
put
       unnecessary constraints on the configuration of keepalive pings,
       requiring users to set this channel arg along with
       GRPC_ARG_KEEPALIVE_TIME_MS. This arg also limited the activity of the 
other
       source of pings in gRPC Core - BDP pings, but BDP pings are only sent 
when
       there is receive-side data activity, making this arg unuseful for BDP 
pings
       too.  */
   #define GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS \
     "grpc.http2.min_time_between_pings_ms"
     ```
   
   
https://github.com/grpc/grpc/blob/caaa581253b963b2ef3fb5b409254af1a657518a/include/grpc/impl/channel_arg_names.h#L101-L111



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+          ("grpc.keepalive_time_ms", 180000),
+          # Default: 5000ms (5s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 300000ms (5min), increased to 10min for stability
+          ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+          # Default: 300000ms (5min), increased to 120s for conservative pings
+          ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),
+          # Default: 2, set to 0 to allow unlimited ping strikes
+          ("grpc.http2.max_ping_strikes", 0),

Review Comment:
   Seems reasonable



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+          ("grpc.keepalive_time_ms", 180000),
+          # Default: 5000ms (5s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 300000ms (5min), increased to 10min for stability
+          ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+          # Default: 300000ms (5min), increased to 120s for conservative pings
+          ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),
+          # Default: 2, set to 0 to allow unlimited ping strikes
+          ("grpc.http2.max_ping_strikes", 0),
+          # Default: 0 (disabled), enable socket reuse for better handling
+          ("grpc.so_reuseport", 1),

Review Comment:
   It's not going to do anything usefull unless you specifically bind multiple 
server listeners to the same port. See 
https://github.com/grpc/grpc/tree/master/examples/python/multiprocessing#calculating-prime-numbers-with-multiple-processes.
   
   This also can be useful when you want to bind a socket to a random (`:0`) 
port, get the actual port number, and only then bind the server to it.



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability
+      ("grpc.keepalive_timeout_ms", 600000),
+      # Default: 2, set to 0 to allow unlimited pings without data
+      ("grpc.http2.max_pings_without_data", 0),

Review Comment:
   Seems reasonable. https://github.com/grpc/grpc/blob/master/doc/keepalive.md 
implying `0` is should've been the default in the first place:
   
   > `GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA`
   > This channel argument controls the maximum number of pings that can be 
sent when there is no data/header frame to be sent. gRPC Core will not continue 
sending pings if we run over the limit. Setting it to 0 allows sending pings 
without such a restriction.
   > (Note that this is an unfortunate setting that does not agree with 
[A8-client-side-keepalive.md](https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md).
 There should ideally be no such restriction on the keepalive ping and we plan 
to deprecate it in the future.)
   



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+          ("grpc.keepalive_time_ms", 180000),
+          # Default: 5000ms (5s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 300000ms (5min), increased to 10min for stability
+          ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+          # Default: 300000ms (5min), increased to 120s for conservative pings
+          ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),
+          # Default: 2, set to 0 to allow unlimited ping strikes
+          ("grpc.http2.max_ping_strikes", 0),
+          # Default: 0 (disabled), enable socket reuse for better handling
+          ("grpc.so_reuseport", 1),
+          # Default: 0 (disabled), set 30s TCP timeout for connection control
+          ("grpc.tcp_user_timeout_ms", 30000),

Review Comment:
   Same as on the channel side, I do not believe this option exists.
   
   



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability

Review Comment:
   The default is 20 seconds
   
   Channel Argument| Client|Server
   ----------------|-------|------
   GRPC_ARG_KEEPALIVE_TIMEOUT_MS|20000 (20 seconds)|20000 (20 seconds)
   
   https://github.com/grpc/grpc/blob/master/doc/keepalive.md



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+          ("grpc.keepalive_time_ms", 180000),
+          # Default: 5000ms (5s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 300000ms (5min), increased to 10min for stability
+          ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+          # Default: 300000ms (5min), increased to 120s for conservative pings
+          ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),

Review Comment:
   Same as on the channel side, see 
https://github.com/apache/beam/pull/36528#discussion_r2479586543



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [

Review Comment:
   I recommend keeping these separate. 
   
   1. There are server-side only options
   2. You don't always want to tune both client and the server exactly the same 
way



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability
+      ("grpc.keepalive_timeout_ms", 600000),
+      # Default: 2, set to 0 to allow unlimited pings without data
+      ("grpc.http2.max_pings_without_data", 0),
+      # Default: False, set to True to allow keepalive pings when no calls
+      ("grpc.keepalive_permit_without_calls", True),
+      # Default: 300000ms (5min), increased to 10min for stability
+      ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+      # Default: 300000ms (5min), increased to 120s for conservative pings
+      ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),
+      # Default: 2, set to 0 to allow unlimited ping strikes
+      ("grpc.http2.max_ping_strikes", 0),

Review Comment:
   Yes, server-only.



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -185,8 +185,28 @@ def start(self):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 30000ms (30s), increased to 180s to reduce ping frequency

Review Comment:
   According to https://github.com/grpc/grpc/blob/master/doc/keepalive.md, the 
default on the server side is 2 hours:
   
   Channel Argument| Client|Server
   ----------------|-------|------
   GRPC_ARG_KEEPALIVE_TIME_MS|INT_MAX (disabled)|7200000 (2 hours)
   
   
   Also see 
https://github.com/grpc/grpc/blob/caaa581253b963b2ef3fb5b409254af1a657518a/include/grpc/impl/channel_arg_names.h#L150-L153
   
   ```c
   /** After a duration of this time the client/server pings its peer to see if 
the
       transport is still alive. Int valued, milliseconds. Defaults to 7200000 
(2
      hours). */
   #define GRPC_ARG_KEEPALIVE_TIME_MS "grpc.keepalive_time_ms"
   ```



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability
+      ("grpc.keepalive_timeout_ms", 600000),

Review Comment:
   This worries me. Consider what this option does:
   
   > `GRPC_ARG_KEEPALIVE_TIMEOUT_MS`
   > This channel argument controls the amount of time (in milliseconds) the 
sender of the keepalive ping waits for an acknowledgement. If it does not 
receive an acknowledgment within this time, it will close the connection.
   > — https://github.com/grpc/grpc/blob/master/doc/keepalive.md
   
   ```c
   /** After waiting for a duration of this time, if the keepalive ping sender 
does
       not receive the ping ack, it will close the transport. Int valued,
       milliseconds. Defaults to 20000 (20 seonds). */
   #define GRPC_ARG_KEEPALIVE_TIMEOUT_MS "grpc.keepalive_timeout_ms"
   ```
   > — 
https://github.com/grpc/grpc/blob/caaa581253b963b2ef3fb5b409254af1a657518a/include/grpc/impl/channel_arg_names.h#L154-L157
   
   Are you really comfortable waiting 10 minutes until a dead channel is 
declared dead?
   Also note that because this value is now larger than 
`grpc.keepalive_time_ms`, it'll result in 3 outstanding keepalive pings, while 
you're still waiting on the response for the first one.
   



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability
+      ("grpc.keepalive_timeout_ms", 600000),
+      # Default: 2, set to 0 to allow unlimited pings without data
+      ("grpc.http2.max_pings_without_data", 0),
+      # Default: False, set to True to allow keepalive pings when no calls
+      ("grpc.keepalive_permit_without_calls", True),
+      # Default: 300000ms (5min), increased to 10min for stability
+      ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+      # Default: 300000ms (5min), increased to 120s for conservative pings
+      ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),
+      # Default: 2, set to 0 to allow unlimited ping strikes
+      ("grpc.http2.max_ping_strikes", 0),
+      # Default: 0 (disabled), enable socket reuse for better handling
+      ("grpc.so_reuseport", 1),
+      # Default: 0 (disabled), set 30s TCP timeout for connection control
+      ("grpc.tcp_user_timeout_ms", 30000),

Review Comment:
   I don't think this option even exists. See 
https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md.



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency

Review Comment:
   According to https://github.com/grpc/grpc/blob/master/doc/keepalive.md, 
there's no default value for this on the client side because by default the 
feature is disabled.
   
   Channel Argument| Client|Server
   ----------------|-------|------
   GRPC_ARG_KEEPALIVE_TIME_MS|INT_MAX (disabled)|7200000 (2 hours)
   



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability
+      ("grpc.keepalive_timeout_ms", 600000),
+      # Default: 2, set to 0 to allow unlimited pings without data
+      ("grpc.http2.max_pings_without_data", 0),
+      # Default: False, set to True to allow keepalive pings when no calls
+      ("grpc.keepalive_permit_without_calls", True),
+      # Default: 300000ms (5min), increased to 10min for stability
+      ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),
+      # Default: 300000ms (5min), increased to 120s for conservative pings
+      ("grpc.http2.min_sent_ping_interval_without_data_ms", 120000),
+      # Default: 2, set to 0 to allow unlimited ping strikes
+      ("grpc.http2.max_ping_strikes", 0),
+      # Default: 0 (disabled), enable socket reuse for better handling
+      ("grpc.so_reuseport", 1),

Review Comment:
   Agree, this is server only, allowing it to bind sockets to the same port.



##########
sdks/python/apache_beam/runners/worker/channel_factory.py:
##########
@@ -23,8 +23,24 @@
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Default: 30000ms (30s), increased to 180s to reduce ping frequency
+      ("grpc.keepalive_time_ms", 180000),
+      # Default: 5000ms (5s), increased to 10 minutes for stability
+      ("grpc.keepalive_timeout_ms", 600000),
+      # Default: 2, set to 0 to allow unlimited pings without data
+      ("grpc.http2.max_pings_without_data", 0),
+      # Default: False, set to True to allow keepalive pings when no calls
+      ("grpc.keepalive_permit_without_calls", True),
+      # Default: 300000ms (5min), increased to 10min for stability
+      ("grpc.http2.min_recv_ping_interval_without_data_ms", 600000),

Review Comment:
   Yes, this is server-side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to