This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 92888144601 optimize grpc settings  (#36528)
92888144601 is described below

commit 92888144601f719c89a34b871f85d1d2483ec433
Author: liferoad <[email protected]>
AuthorDate: Thu Nov 13 23:56:33 2025 -0500

    optimize grpc settings  (#36528)
    
    * increase grpc keepalive timeout and adjust ping settings
    
    Adjust GRPC channel settings to reduce ping frequency and allow more 
flexible keepalive behavior. This improves performance by reducing unnecessary 
network traffic while maintaining connection stability.
    
    * yapf
    
    * perf(subprocess_server): add grpc keepalive options to improve connection 
stability
    
    Add various grpc keepalive and ping-related options to prevent connection 
drops during long-running operations. The new settings help maintain active 
connections and detect failures faster.
    
    * perf(grpc): increase keepalive and ping intervals to reduce frequency
    
    Increase grpc.keepalive_time_ms from 30s to 60s and 
grpc.http2.min_sent_ping_interval_without_data_ms from 10s to 30s to reduce 
network overhead and improve performance
    
    * format
    
    * more changes
    
    * fix(milvus): increase timeout to 60s for container startup
    
    * fix(io): handle empty init_result in FileBasedSink by falling back to 
temp dir
    
    Add fallback logic when initialization result is EmptySideInput to create a 
temporary directory instead. This prevents potential issues when the pipeline 
initialization phase returns an empty collection.
    
    * retry Milvus
    
    * style: use string formatting in milvus search logging
    
    * fixed external tests
    
    * tests
    
    * fix(enrichment_test): sort output and expected values before comparison
    
    Ensure test passes when output order differs from expected order
    
    * docs(filebasedsink): add TODO comment for prism issue
    
    Add reference to GitHub issue #36563 for Prism compatibility
    
    * more tunes on the grpc options
    
    * addressed some comments
    
    * removed some options
    
    * keep 300000 for keepalive_timeout_ms
    
    * fixed the comments
    
    * added keepalive_time_ms back
    
    * Update sdks/python/apache_beam/utils/subprocess_server.py
    
    Co-authored-by: Sergii Tkachenko <[email protected]>
    
    * address comments.
    
    ---------
    
    Co-authored-by: tvalentyn <[email protected]>
    Co-authored-by: Sergii Tkachenko <[email protected]>
---
 .../python/apache_beam/runners/worker/channel_factory.py | 10 ++++++++--
 sdks/python/apache_beam/utils/subprocess_server.py       | 16 ++++++++++++++--
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/channel_factory.py 
b/sdks/python/apache_beam/runners/worker/channel_factory.py
index 6ad0f7235e9..afb4d182cab 100644
--- a/sdks/python/apache_beam/runners/worker/channel_factory.py
+++ b/sdks/python/apache_beam/runners/worker/channel_factory.py
@@ -23,8 +23,14 @@ import grpc
 
 class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
   DEFAULT_OPTIONS = [
-      ("grpc.keepalive_time_ms", 20000),
-      ("grpc.keepalive_timeout_ms", 300000),
+      # Setting keepalive_time_ms is needed for other options to work.
+      ("grpc.keepalive_time_ms", 20_000),
+      # Default: 20s. Increasing to 5 min.
+      ("grpc.keepalive_timeout_ms", 300_000),
+      # Default: 2, set to 0 to allow unlimited pings without data
+      ("grpc.http2.max_pings_without_data", 0),
+      # Default: False, set to True to allow keepalive pings when no calls
+      ("grpc.keepalive_permit_without_calls", True),
   ]
 
   def __init__(self):
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index 7fb692e66ea..ff1a0d9c46a 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -185,8 +185,20 @@ class SubprocessServer(object):
     try:
       process, endpoint = self.start_process()
       wait_secs = .1
-      channel_options = [("grpc.max_receive_message_length", -1),
-                         ("grpc.max_send_message_length", -1)]
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 20000ms (20s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600_000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 2, set to 0 to allow unlimited ping strikes
+          ("grpc.http2.max_ping_strikes", 0),
+          # Default: 0 (disabled), enable socket reuse for better handling
+          ("grpc.so_reuseport", 1),
+      ]
       self._grpc_channel = grpc.insecure_channel(
           endpoint, options=channel_options)
       channel_ready = grpc.channel_ready_future(self._grpc_channel)

Reply via email to