This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 92888144601 optimize grpc settings (#36528)
92888144601 is described below
commit 92888144601f719c89a34b871f85d1d2483ec433
Author: liferoad <[email protected]>
AuthorDate: Thu Nov 13 23:56:33 2025 -0500
optimize grpc settings (#36528)
* increase grpc keepalive timeout and adjust ping settings
Adjust GRPC channel settings to reduce ping frequency and allow more
flexible keepalive behavior. This improves performance by reducing unnecessary
network traffic while maintaining connection stability.
* yapf
* perf(subprocess_server): add grpc keepalive options to improve connection
stability
Add various grpc keepalive and ping-related options to prevent connection
drops during long-running operations. The new settings help maintain active
connections and detect failures faster.
* perf(grpc): increase keepalive and ping intervals to reduce frequency
Increase grpc.keepalive_time_ms from 30s to 60s and
grpc.http2.min_sent_ping_interval_without_data_ms from 10s to 30s to reduce
network overhead and improve performance
* format
* more changes
* fix(milvus): increase timeout to 60s for container startup
* fix(io): handle empty init_result in FileBasedSink by falling back to
temp dir
Add fallback logic when initialization result is EmptySideInput to create a
temporary directory instead. This prevents potential issues when the pipeline
initialization phase returns an empty collection.
* retry Milvus
* style: use string formatting in milvus search logging
* fixed external tests
* tests
* fix(enrichment_test): sort output and expected values before comparison
Ensure test passes when output order differs from expected order
* docs(filebasedsink): add TODO comment for prism issue
Add reference to GitHub issue #36563 for Prism compatibility
* more tunes on the grpc options
* addressed some comments
* removed some options
* keep 300000 for keepalive_timeout_ms
* fixed the comments
* added keepalive_time_ms back
* Update sdks/python/apache_beam/utils/subprocess_server.py
Co-authored-by: Sergii Tkachenko <[email protected]>
* address comments.
---------
Co-authored-by: tvalentyn <[email protected]>
Co-authored-by: Sergii Tkachenko <[email protected]>
---
.../python/apache_beam/runners/worker/channel_factory.py | 10 ++++++++--
sdks/python/apache_beam/utils/subprocess_server.py | 16 ++++++++++++++--
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/channel_factory.py
b/sdks/python/apache_beam/runners/worker/channel_factory.py
index 6ad0f7235e9..afb4d182cab 100644
--- a/sdks/python/apache_beam/runners/worker/channel_factory.py
+++ b/sdks/python/apache_beam/runners/worker/channel_factory.py
@@ -23,8 +23,14 @@ import grpc
class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
DEFAULT_OPTIONS = [
- ("grpc.keepalive_time_ms", 20000),
- ("grpc.keepalive_timeout_ms", 300000),
+ # Setting keepalive_time_ms is needed for other options to work.
+ ("grpc.keepalive_time_ms", 20_000),
+ # Default: 20s. Increasing to 5 min.
+ ("grpc.keepalive_timeout_ms", 300_000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no calls
+ ("grpc.keepalive_permit_without_calls", True),
]
def __init__(self):
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py
b/sdks/python/apache_beam/utils/subprocess_server.py
index 7fb692e66ea..ff1a0d9c46a 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -185,8 +185,20 @@ class SubprocessServer(object):
try:
process, endpoint = self.start_process()
wait_secs = .1
- channel_options = [("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1)]
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 20000ms (20s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600_000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no calls
+ ("grpc.keepalive_permit_without_calls", True),
+ # Default: 2, set to 0 to allow unlimited ping strikes
+ ("grpc.http2.max_ping_strikes", 0),
+ # Default: 0 (disabled), enable socket reuse for better handling
+ ("grpc.so_reuseport", 1),
+ ]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)