gemini-code-assist[bot] commented on code in PR #38581:
URL: https://github.com/apache/beam/pull/38581#discussion_r3282916770
##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -478,9 +491,38 @@ def __init__(self, data_buffer_time_limit_ms=0):
def close(self):
# type: () -> None
- self._to_send.put(self._WRITES_FINISHED, 0)
+ self._enqueue_to_send(self._WRITES_FINISHED)
+ if self._send_forwarder is not None:
+ self._send_forwarder.join()
self._closed = True
+ def _start_send_forwarder(self):
+ # type: () -> None
+ forwarder = threading.Thread(
+ target=self._forward_pending_to_send,
+ name='forward_grpc_outputs')
+ forwarder.daemon = True
+ forwarder.start()
+ self._send_forwarder = forwarder
+
+ def _enqueue_to_send(self, elem):
+ # type: (DataOrTimers) -> None
+ self._pending_send.put(elem, self._get_element_size_bytes(elem))
+
+ def _forward_pending_to_send(self):
+ # type: () -> None
+ try:
+ while True:
+ elem = self._pending_send.get()
+ self._to_send.put(elem, self._get_element_size_bytes(elem))
+ if elem is self._WRITES_FINISHED:
+ return
+ except Exception as e:
+ if not self._closed:
+ _LOGGER.exception('Failed to forward outputs in the data plane.')
+ self._exception = e
+ raise
Review Comment:

The `_forward_pending_to_send` method re-calculates the size of each element
using `self._get_element_size_bytes(elem)`. Since `_enqueue_to_send` already
calculates this size to put it into the `_pending_send` queue, this is
redundant work. For large elements, this could be a performance bottleneck.
Consider passing a tuple of `(element, size)` through the `_pending_send` queue
to avoid re-calculation.
```suggestion
def _forward_pending_to_send(self):
# type: () -> None
try:
while True:
elem, size = self._pending_send.get()
self._to_send.put(elem, size)
if elem is self._WRITES_FINISHED:
return
except Exception as e:
if not self._closed:
_LOGGER.exception('Failed to forward outputs in the data plane.')
self._exception = e
raise
```
##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -478,9 +491,38 @@ def __init__(self, data_buffer_time_limit_ms=0):
def close(self):
# type: () -> None
- self._to_send.put(self._WRITES_FINISHED, 0)
+ self._enqueue_to_send(self._WRITES_FINISHED)
+ if self._send_forwarder is not None:
+ self._send_forwarder.join()
self._closed = True
Review Comment:

The `close` method joins the `_send_forwarder` thread but does not check if
that thread encountered an exception (stored in `self._exception`). If the
forwarder thread fails, data might not be fully sent, and the caller should be
notified. It is recommended to check and re-raise `self._exception` after the
join.
```suggestion
def close(self):
# type: () -> None
self._enqueue_to_send(self._WRITES_FINISHED)
if self._send_forwarder is not None:
self._send_forwarder.join()
if self._exception:
raise self._exception
self._closed = True
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]