This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ce299cfe80 try fixing mailbox cancel race condition (#10432)
ce299cfe80 is described below
commit ce299cfe80754fb307325fb037c3ddb22808f970
Author: Rong Rong <[email protected]>
AuthorDate: Wed Mar 15 23:26:14 2023 -0700
try fixing mailbox cancel race condition (#10432)
Co-authored-by: Rong Rong <[email protected]>
---
.../org/apache/pinot/query/mailbox/GrpcSendingMailbox.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 3fa6d6c229..aef88a7d46 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ public class GrpcSendingMailbox implements
SendingMailbox<TransferableBlock> {
private final Function<Long, StreamObserver<MailboxContent>>
_mailboxContentStreamObserverSupplier;
private final MailboxStatusStreamObserver _statusObserver;
private final long _deadlineMs;
+ private TransferableBlock _errorBlock;
public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver
statusObserver,
Function<Long, StreamObserver<MailboxContent>>
contentStreamObserverSupplier, long deadlineMs) {
@@ -55,6 +57,7 @@ public class GrpcSendingMailbox implements
SendingMailbox<TransferableBlock> {
_mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
_statusObserver = statusObserver;
_deadlineMs = deadlineMs;
+ _errorBlock = null;
}
@Override
@@ -63,7 +66,7 @@ public class GrpcSendingMailbox implements
SendingMailbox<TransferableBlock> {
if (!_initialized.get()) {
open();
}
- Preconditions.checkState(!_statusObserver.isFinished(),
+ Preconditions.checkState(!_statusObserver.isFinished() || _errorBlock !=
null,
"Called send when stream is already closed for mailbox=" + _mailboxId);
MailboxContent data = toMailboxContent(block.getDataBlock());
_mailboxContentStreamObserver.onNext(data);
@@ -72,6 +75,7 @@ public class GrpcSendingMailbox implements
SendingMailbox<TransferableBlock> {
@Override
public void complete()
throws Exception {
+ // TODO: should wait for _mailboxContentStreamObserver.onNext() finish
before calling onComplete().
_mailboxContentStreamObserver.onCompleted();
}
@@ -85,8 +89,10 @@ public class GrpcSendingMailbox implements
SendingMailbox<TransferableBlock> {
if (_initialized.get() && !_statusObserver.isFinished()) {
LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
try {
- _mailboxContentStreamObserver.onError(Status.fromThrowable(
- new RuntimeException("Cancelled by the
sender")).asRuntimeException());
+ RuntimeException e = new RuntimeException("Cancelled by the sender");
+ _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+
_mailboxContentStreamObserver.onNext(toMailboxContent(_errorBlock.getDataBlock()));
+
_mailboxContentStreamObserver.onError(Status.fromThrowable(e).asRuntimeException());
} catch (Exception e) {
// TODO: We don't necessarily need to log this since this is
relatively quite likely to happen. Logging this
// anyways as info for now so we can see how frequently this happens.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]