gortiz commented on code in PR #16903:
URL: https://github.com/apache/pinot/pull/16903#discussion_r2419188611


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -71,31 +79,42 @@ public void send(MseBlock.Eos block, List<DataBuffer> 
serializedStats)
     sendPrivate(block, serializedStats);
   }
 
-  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
-      throws TimeoutException {
+  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
+      LOGGER.debug("Mailbox {} already terminated, ignoring block {}", _id, 
block);
       return;
     }
     if (_receivingMailbox == null) {
       _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
     }
     _statMap.merge(MailboxSendOperator.StatKey.IN_MEMORY_MESSAGES, 1);
     long timeoutMs = _deadlineMs - System.currentTimeMillis();
-    ReceivingMailbox.ReceivingMailboxStatus status = 
_receivingMailbox.offer(block, serializedStats, timeoutMs);
-
+    ReceivingMailbox.ReceivingMailboxStatus status;
+    try {
+      status = _receivingMailbox.offer(block, serializedStats, timeoutMs);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new QueryException(QueryErrorCode.INTERNAL, "Interrupted while 
sending data to mailbox: " + _id, e);
+    } catch (TimeoutException e) {
+      throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
+          String.format("Timed out adding block into mailbox: %s with timeout: 
%dms", _id, timeoutMs));
+    }
+    _isEarlyTerminated = status != 
ReceivingMailbox.ReceivingMailboxStatus.SUCCESS;
     switch (status) {
       case SUCCESS:
+      case WAITING_EOS:
+        break;
+      case LAST_BLOCK:
+        _isTerminated = true;
         break;
-      case CANCELLED:
-        throw new QueryCancelledException(String.format("Mailbox: %s already 
cancelled from upstream", _id));
-      case ERROR:
-        throw new QueryException(QueryErrorCode.INTERNAL, String.format(
-            "Mailbox: %s already errored out (received error block before)", 
_id));
-      case TIMEOUT:
-        throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
-            String.format("Timed out adding block into mailbox: %s with 
timeout: %dms", _id, timeoutMs));
-      case EARLY_TERMINATED:
-        _isEarlyTerminated = true;
+      case ALREADY_TERMINATED:
+        if (_isTerminated) {
+          LOGGER.debug("Local mailbox received a late message once the stream 
was closed. This can happen due to "
+              + "race condition between sending the last block and closing the 
stream on the sender side");
+        } else {
+          throw new QueryException(QueryErrorCode.INTERNAL, String.format(

Review Comment:
   I've changed this code to log in the same way we log in GRPC mailboxes. Now 
we log a message saying this should not happen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to