This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 82589e24df [multistage] [bugfix] Temp fix for grpc receiving buffer 
full issue (#10099)
82589e24df is described below

commit 82589e24dfb5fa59263172a63440b6959fd9364f
Author: Yao Liu <[email protected]>
AuthorDate: Tue Jan 17 11:37:53 2023 -0800

    [multistage] [bugfix] Temp fix for grpc receiving buffer full issue (#10099)
    
    * unbounded mailbox queue
    
    * unbounded mailbox buffer
    
    * log max buffer size
---
 .../channel/MailboxContentStreamObserver.java      | 42 ++++++++++++++++++----
 1 file changed, 36 insertions(+), 6 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 238c3cf60c..9fd3a4b2ba 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -21,11 +21,13 @@ package org.apache.pinot.query.mailbox.channel;
 import com.google.protobuf.ByteString;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
@@ -35,6 +37,8 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.Math.max;
+
 
 /**
  * {@code MailboxContentStreamObserver} is the content streaming observer used 
to receive mailbox content.
@@ -59,13 +63,32 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
   private final boolean _isEnabledFeedback;
 
   private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
-  private final ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
-
-  ReadWriteLock _errorLock = new ReentrantReadWriteLock();
-  private Mailbox.MailboxContent _errorContent = null; // Guarded by 
_errorLock.
+  private final BlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
+
+  private ReadWriteLock _bufferSizeLock = new ReentrantReadWriteLock();
+  @GuardedBy("bufferSizeLock")
+  private int _maxBufferSize = 0;
+  private ReadWriteLock _errorLock = new ReentrantReadWriteLock();
+  @GuardedBy("_errorLock")
+  private Mailbox.MailboxContent _errorContent = null;
   private StringMailboxIdentifier _mailboxId;
   private Consumer<MailboxIdentifier> _gotMailCallback;
 
+  private void updateMaxBufferSize() {
+    _bufferSizeLock.writeLock().lock();
+    _maxBufferSize = max(_maxBufferSize, _receivingBuffer.size());
+    _bufferSizeLock.writeLock().unlock();
+  }
+
+  private int getMaxBufferSize() {
+    try {
+      _bufferSizeLock.readLock().lock();
+      return _maxBufferSize;
+    } finally {
+      _bufferSizeLock.readLock().unlock();
+    }
+  }
+
   public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
       StreamObserver<Mailbox.MailboxStatus> responseObserver) {
     this(mailboxService, responseObserver, false);
@@ -75,7 +98,9 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
       StreamObserver<Mailbox.MailboxStatus> responseObserver, boolean 
isEnabledFeedback) {
     _mailboxService = mailboxService;
     _responseObserver = responseObserver;
-    _receivingBuffer = new 
ArrayBlockingQueue<>(DEFAULT_MAILBOX_QUEUE_CAPACITY);
+    // TODO: Replace unbounded queue with bounded queue when we have 
backpressure in place.
+    // It is possible this will create high memory pressure since we have 
memory leak issues.
+    _receivingBuffer = new LinkedBlockingQueue();
     _isEnabledFeedback = isEnabledFeedback;
   }
 
@@ -124,6 +149,7 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
           _errorContent = createErrorContent(e);
         } catch (IOException ioe) {
           e = new RuntimeException("Unable to encode exception for cascade 
reporting: " + e, ioe);
+          LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
           LOGGER.error(e.getMessage());
           throw e;
         } finally {
@@ -132,6 +158,8 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
       }
       _gotMailCallback.accept(_mailboxId);
 
+      updateMaxBufferSize();
+
       if (_isEnabledFeedback) {
         // TODO: this has race conditions with onCompleted() because sender 
blindly closes connection channels once
         // it has finished sending all the data packets.
@@ -161,6 +189,7 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
       throw new RuntimeException("Unable to encode exception for cascade 
reporting: " + e, ioe);
     } finally {
       _errorLock.writeLock().unlock();
+      LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
     }
   }
 
@@ -168,5 +197,6 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
   public void onCompleted() {
     _isCompleted.set(true);
     _responseObserver.onCompleted();
+    LOGGER.debug("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to