gortiz commented on code in PR #16903:
URL: https://github.com/apache/pinot/pull/16903#discussion_r2425549831
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -122,11 +116,10 @@ public boolean send(MseBlock.Data block)
* API to send a block to the destination mailboxes.
* @param eosBlock the block to be transferred
* @return true if all the mailboxes has been early terminated.
- * @throws IOException when sending stream unexpectedly closed.
- * @throws TimeoutException when sending stream timeout.
+ * @throws org.apache.pinot.spi.exception.QueryException if any mailbox
fails to send the block, including on timeout.
*/
- public boolean send(MseBlock.Eos eosBlock, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException {
+ // TODO: Remove throws, as they are never thrown
Review Comment:
Nop. Removed
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -60,44 +67,50 @@ public boolean isLocal() {
}
@Override
- public void send(MseBlock.Data data)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Data data) {
sendPrivate(data, Collections.emptyList());
}
@Override
- public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
sendPrivate(block, serializedStats);
_isTerminated = true;
}
- private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
- throws TimeoutException {
+ private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats) {
if (isTerminated() || (isEarlyTerminated() && block.isData())) {
+ LOGGER.debug("Mailbox {} already terminated, ignoring block {}", _id,
block);
return;
}
if (_receivingMailbox == null) {
_receivingMailbox = _mailboxService.getReceivingMailbox(_id);
}
_statMap.merge(MailboxSendOperator.StatKey.IN_MEMORY_MESSAGES, 1);
long timeoutMs = _deadlineMs - System.currentTimeMillis();
- ReceivingMailbox.ReceivingMailboxStatus status =
_receivingMailbox.offer(block, serializedStats, timeoutMs);
-
+ ReceivingMailbox.ReceivingMailboxStatus status;
+ try {
+ status = _receivingMailbox.offer(block, serializedStats, timeoutMs);
+ } catch (InterruptedException e) {
+ // We are not restoring the interrupt status because we are already
throwing an exception
+ // Code that catches this exception must finish the work fast enough to
comply the interrupt contract
+ // See https://github.com/apache/pinot/pull/16903#discussion_r2409003423
+ throw new QueryException(QueryErrorCode.INTERNAL, "Interrupted while
sending data to mailbox: " + _id, e);
+ } catch (TimeoutException e) {
+ throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT, "Timed out
adding block into mailbox: " + _id
+ + " with timeout: " + Duration.of(timeoutMs, ChronoUnit.MILLIS), e);
+ }
switch (status) {
case SUCCESS:
break;
- case CANCELLED:
- throw new QueryCancelledException(String.format("Mailbox: %s already
cancelled from upstream", _id));
- case ERROR:
- throw new QueryException(QueryErrorCode.INTERNAL, String.format(
- "Mailbox: %s already errored out (received error block before)",
_id));
- case TIMEOUT:
- throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
- String.format("Timed out adding block into mailbox: %s with
timeout: %dms", _id, timeoutMs));
- case EARLY_TERMINATED:
+ case WAITING_EOS:
_isEarlyTerminated = true;
break;
+ case LAST_BLOCK:
+ _isTerminated = true;
+ break;
+ case ALREADY_TERMINATED:
+ LOGGER.warn("Trying to offer blocks to the already closed mailbox {}.
This should not happen", _id);
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]