This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 82589e24df [multistage] [bugfix] Temp fix for grpc receiving buffer
full issue (#10099)
82589e24df is described below
commit 82589e24dfb5fa59263172a63440b6959fd9364f
Author: Yao Liu <[email protected]>
AuthorDate: Tue Jan 17 11:37:53 2023 -0800
[multistage] [bugfix] Temp fix for grpc receiving buffer full issue (#10099)
* unbounded mailbox queue
* unbounded mailbox buffer
* log max buffer size
---
.../channel/MailboxContentStreamObserver.java | 42 ++++++++++++++++++----
1 file changed, 36 insertions(+), 6 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 238c3cf60c..9fd3a4b2ba 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -21,11 +21,13 @@ package org.apache.pinot.query.mailbox.channel;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
@@ -35,6 +37,8 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.lang.Math.max;
+
/**
* {@code MailboxContentStreamObserver} is the content streaming observer used
to receive mailbox content.
@@ -59,13 +63,32 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
private final boolean _isEnabledFeedback;
private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
- private final ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
-
- ReadWriteLock _errorLock = new ReentrantReadWriteLock();
- private Mailbox.MailboxContent _errorContent = null; // Guarded by
_errorLock.
+ private final BlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
+
+ private ReadWriteLock _bufferSizeLock = new ReentrantReadWriteLock();
+ @GuardedBy("bufferSizeLock")
+ private int _maxBufferSize = 0;
+ private ReadWriteLock _errorLock = new ReentrantReadWriteLock();
+ @GuardedBy("_errorLock")
+ private Mailbox.MailboxContent _errorContent = null;
private StringMailboxIdentifier _mailboxId;
private Consumer<MailboxIdentifier> _gotMailCallback;
+ private void updateMaxBufferSize() {
+ _bufferSizeLock.writeLock().lock();
+ _maxBufferSize = max(_maxBufferSize, _receivingBuffer.size());
+ _bufferSizeLock.writeLock().unlock();
+ }
+
+ private int getMaxBufferSize() {
+ try {
+ _bufferSizeLock.readLock().lock();
+ return _maxBufferSize;
+ } finally {
+ _bufferSizeLock.readLock().unlock();
+ }
+ }
+
public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
StreamObserver<Mailbox.MailboxStatus> responseObserver) {
this(mailboxService, responseObserver, false);
@@ -75,7 +98,9 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
StreamObserver<Mailbox.MailboxStatus> responseObserver, boolean
isEnabledFeedback) {
_mailboxService = mailboxService;
_responseObserver = responseObserver;
- _receivingBuffer = new
ArrayBlockingQueue<>(DEFAULT_MAILBOX_QUEUE_CAPACITY);
+ // TODO: Replace unbounded queue with bounded queue when we have
backpressure in place.
+ // It is possible this will create high memory pressure since we have
memory leak issues.
+ _receivingBuffer = new LinkedBlockingQueue();
_isEnabledFeedback = isEnabledFeedback;
}
@@ -124,6 +149,7 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
_errorContent = createErrorContent(e);
} catch (IOException ioe) {
e = new RuntimeException("Unable to encode exception for cascade
reporting: " + e, ioe);
+ LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:",
_mailboxId);
LOGGER.error(e.getMessage());
throw e;
} finally {
@@ -132,6 +158,8 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
}
_gotMailCallback.accept(_mailboxId);
+ updateMaxBufferSize();
+
if (_isEnabledFeedback) {
// TODO: this has race conditions with onCompleted() because sender
blindly closes connection channels once
// it has finished sending all the data packets.
@@ -161,6 +189,7 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
throw new RuntimeException("Unable to encode exception for cascade
reporting: " + e, ioe);
} finally {
_errorLock.writeLock().unlock();
+ LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:",
_mailboxId);
}
}
@@ -168,5 +197,6 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
public void onCompleted() {
_isCompleted.set(true);
_responseObserver.onCompleted();
+ LOGGER.debug("MaxBufferSize:", getMaxBufferSize(), " for mailbox:",
_mailboxId);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]