This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b261f0a399 Fix Fn API data plane deadlock when outbound queue is full 
(#38581)
1b261f0a399 is described below

commit 1b261f0a39915eebb9e53f384d59cba8fa726d8a
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Tue May 26 17:01:39 2026 +0200

    Fix Fn API data plane deadlock when outbound queue is full (#38581)
---
 .../apache_beam/runners/worker/data_plane.py       | 60 +++++++++++++++++++---
 1 file changed, 52 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index a5589ac33a1..cfefa37d76b 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -68,6 +68,9 @@ _FLUSH_MAX_SIZE = (2 << 30) - 100  # 2GB less some overhead, 
protobuf/grpc limit
 # Keep a set of completed instructions to discard late received data. The set
 # can have up to _MAX_CLEANED_INSTRUCTIONS items. See _GrpcDataChannel.
 _MAX_CLEANED_INSTRUCTIONS = 10000
+_DEFAULT_SEND_QUEUE_MAX_ELEMENTS = 10000
+_DEFAULT_SEND_QUEUE_MAX_BYTES = 100 << 20  # 100MB
+_DEFAULT_RECEIVE_QUEUE_MAX_ELEMENTS = 5
 
 # retry on transient UNAVAILABLE grpc error from data channels.
 _GRPC_SERVICE_CONFIG = json.dumps({
@@ -459,10 +462,20 @@ class _GrpcDataChannel(DataChannel):
 
     self._data_buffer_time_limit_ms = data_buffer_time_limit_ms
     self._to_send = ByteLimitedQueue(
-        maxsize=10000,
-        maxbytes=100 << 20)  # type: ByteLimitedQueue[DataOrTimers]
+        maxsize=_DEFAULT_SEND_QUEUE_MAX_ELEMENTS,
+        maxbytes=_DEFAULT_SEND_QUEUE_MAX_BYTES
+    )  # type: ByteLimitedQueue[DataOrTimers]
+    # Staging queue so a full send buffer does not block reading inputs.
+    self._pending_send = ByteLimitedQueue(
+        maxsize=_DEFAULT_SEND_QUEUE_MAX_ELEMENTS,
+        maxbytes=_DEFAULT_SEND_QUEUE_MAX_BYTES
+    )  # type: ByteLimitedQueue[DataOrTimers]
+    self._send_forwarder = None  # type: Optional[threading.Thread]
+    self._start_send_forwarder()
     self._received = collections.defaultdict(
-        lambda: ByteLimitedQueue(maxsize=5, maxbytes=100 << 20)
+        lambda: ByteLimitedQueue(
+            maxsize=_DEFAULT_RECEIVE_QUEUE_MAX_ELEMENTS, maxbytes=
+            _DEFAULT_SEND_QUEUE_MAX_BYTES)
     )  # type: DefaultDict[str, ByteLimitedQueue[DataOrTimers]]
 
     # Keep a cache of completed instructions. Data for completed instructions
@@ -478,9 +491,40 @@ class _GrpcDataChannel(DataChannel):
 
   def close(self):
     # type: () -> None
-    self._to_send.put(self._WRITES_FINISHED, 0)
+    self._enqueue_to_send(self._WRITES_FINISHED)
+    if self._send_forwarder is not None:
+      self._send_forwarder.join()
+    if self._exception:
+      raise self._exception
     self._closed = True
 
+  def _start_send_forwarder(self):
+    # type: () -> None
+    forwarder = threading.Thread(
+        target=self._forward_pending_to_send, name='forward_grpc_outputs')
+    forwarder.daemon = True
+    forwarder.start()
+    self._send_forwarder = forwarder
+
+  def _enqueue_to_send(self, elem):
+    # type: (DataOrTimers) -> None
+    size = self._get_element_size_bytes(elem)
+    self._pending_send.put((elem, size), size)
+
+  def _forward_pending_to_send(self):
+    # type: () -> None
+    try:
+      while True:
+        elem, size = self._pending_send.get()
+        self._to_send.put(elem, size)
+        if elem is self._WRITES_FINISHED:
+          return
+    except Exception as e:
+      if not self._closed:
+        _LOGGER.exception('Failed to forward outputs in the data plane.')
+        self._exception = e
+      raise
+
   def wait(self, timeout=None):
     # type: (Optional[int]) -> None
     self._reads_finished.wait(timeout)
@@ -591,7 +635,7 @@ class _GrpcDataChannel(DataChannel):
       if data:
         elem = beam_fn_api_pb2.Elements.Data(
             instruction_id=instruction_id, transform_id=transform_id, 
data=data)
-        self._to_send.put(elem, self._get_element_size_bytes(elem))
+        self._enqueue_to_send(elem)
 
     def close_callback(data):
       # type: (bytes) -> None
@@ -601,7 +645,7 @@ class _GrpcDataChannel(DataChannel):
           instruction_id=instruction_id,
           transform_id=transform_id,
           is_last=True)
-      self._to_send.put(elem, self._get_element_size_bytes(elem))
+      self._enqueue_to_send(elem)
 
     return ClosableOutputStream.create(
         close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
@@ -622,7 +666,7 @@ class _GrpcDataChannel(DataChannel):
             timer_family_id=timer_family_id,
             timers=timer,
             is_last=False)
-        self._to_send.put(elem, self._get_element_size_bytes(elem))
+        self._enqueue_to_send(elem)
 
     def close_callback(timer):
       # type: (bytes) -> None
@@ -632,7 +676,7 @@ class _GrpcDataChannel(DataChannel):
           transform_id=transform_id,
           timer_family_id=timer_family_id,
           is_last=True)
-      self._to_send.put(elem, self._get_element_size_bytes(elem))
+      self._enqueue_to_send(elem)
 
     return ClosableOutputStream.create(
         close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)

Reply via email to