This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ce299cfe80 try fixing mailbox cancel race condition (#10432)
ce299cfe80 is described below

commit ce299cfe80754fb307325fb037c3ddb22808f970
Author: Rong Rong <[email protected]>
AuthorDate: Wed Mar 15 23:26:14 2023 -0700

    try fixing mailbox cancel race condition (#10432)
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../org/apache/pinot/query/mailbox/GrpcSendingMailbox.java   | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 3fa6d6c229..aef88a7d46 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +49,7 @@ public class GrpcSendingMailbox implements 
SendingMailbox<TransferableBlock> {
   private final Function<Long, StreamObserver<MailboxContent>> 
_mailboxContentStreamObserverSupplier;
   private final MailboxStatusStreamObserver _statusObserver;
   private final long _deadlineMs;
+  private TransferableBlock _errorBlock;
 
   public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver 
statusObserver,
       Function<Long, StreamObserver<MailboxContent>> 
contentStreamObserverSupplier, long deadlineMs) {
@@ -55,6 +57,7 @@ public class GrpcSendingMailbox implements 
SendingMailbox<TransferableBlock> {
     _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
     _statusObserver = statusObserver;
     _deadlineMs = deadlineMs;
+    _errorBlock = null;
   }
 
   @Override
@@ -63,7 +66,7 @@ public class GrpcSendingMailbox implements 
SendingMailbox<TransferableBlock> {
     if (!_initialized.get()) {
       open();
     }
-    Preconditions.checkState(!_statusObserver.isFinished(),
+    Preconditions.checkState(!_statusObserver.isFinished() || _errorBlock != 
null,
         "Called send when stream is already closed for mailbox=" + _mailboxId);
     MailboxContent data = toMailboxContent(block.getDataBlock());
     _mailboxContentStreamObserver.onNext(data);
@@ -72,6 +75,7 @@ public class GrpcSendingMailbox implements 
SendingMailbox<TransferableBlock> {
   @Override
   public void complete()
       throws Exception {
+    // TODO: should wait for _mailboxContentStreamObserver.onNext() finish 
before calling onComplete().
     _mailboxContentStreamObserver.onCompleted();
   }
 
@@ -85,8 +89,10 @@ public class GrpcSendingMailbox implements 
SendingMailbox<TransferableBlock> {
     if (_initialized.get() && !_statusObserver.isFinished()) {
       LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
       try {
-        _mailboxContentStreamObserver.onError(Status.fromThrowable(
-            new RuntimeException("Cancelled by the 
sender")).asRuntimeException());
+        RuntimeException e = new RuntimeException("Cancelled by the sender");
+        _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+        
_mailboxContentStreamObserver.onNext(toMailboxContent(_errorBlock.getDataBlock()));
+        
_mailboxContentStreamObserver.onError(Status.fromThrowable(e).asRuntimeException());
       } catch (Exception e) {
         // TODO: We don't necessarily need to log this since this is 
relatively quite likely to happen. Logging this
         //  anyways as info for now so we can see how frequently this happens.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to