Copilot commented on code in PR #16915:
URL: https://github.com/apache/pinot/pull/16915#discussion_r2402296568
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -139,16 +139,28 @@ public boolean send(MseBlock.Eos eosBlock,
List<DataBuffer> serializedStats)
// this may happen when the block exchange is itself used as a sending
mailbox, like when using spools
mailboxIdToSendMetadata = -1;
}
+ RuntimeException firstException = null;
for (int i = 0; i < numMailboxes; i++) {
- SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
- List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
+ try {
+ SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+ List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
- sendingMailbox.send(eosBlock, statsToSend);
- sendingMailbox.complete();
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ sendingMailbox.send(eosBlock, statsToSend);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ }
+ } catch (IOException | TimeoutException e) {
+ // We want to try to send EOS to all mailboxes, so we catch the
exception and rethrow it at the end.
+ if (firstException == null) {
+ firstException = new RuntimeException("Failed to send EOS block to
mailbox #" + i, e);
+ } else {
+ firstException.addSuppressed(e);
+ }
}
Review Comment:
The exception handling only catches `IOException` and `TimeoutException`,
but the `send()` method can potentially throw `RuntimeException` (as seen in
the new completion logic). Any uncaught `RuntimeException` from individual
mailbox sends will prevent EOS from being sent to remaining mailboxes, which
violates the stated goal of trying to send EOS to all mailboxes.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -97,7 +97,12 @@ public void send(MseBlock.Data data)
@Override
public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
throws IOException, TimeoutException {
- sendInternal(block, serializedStats);
+ try {
+ sendInternal(block, serializedStats);
+ } finally {
+ LOGGER.debug("Completing mailbox: {}", _id);
+ _contentObserver.onCompleted();
+ }
Review Comment:
The `_contentObserver.onCompleted()` will be called even if `sendInternal()`
throws an exception. This could lead to calling `onCompleted()` on a gRPC
observer that may already be in an error state, potentially causing undefined
behavior. Consider checking if the observer is still in a valid state before
calling `onCompleted()`, or move the completion logic to execute only on
successful send.
```suggestion
sendInternal(block, serializedStats);
LOGGER.debug("Completing mailbox: {}", _id);
_contentObserver.onCompleted();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]