gortiz commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3274174484


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -208,17 +266,84 @@ public boolean isTerminated() {
     return _senderSideClosed || _statusObserver.isFinished();
   }
 
-  private StreamObserver<MailboxContent> getContentObserver() {
+  private ClientCallStreamObserver<MailboxContent> getContentObserver() {
     Metadata metadata = new Metadata();
     metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
 
-    return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname, 
_port))
+    // We wrap `_statusObserver` in a ClientResponseObserver so we can 
register the on-ready handler through
+    // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after 
open() returns. Wrapping (rather than
+    // making MailboxStatusObserver itself a ClientResponseObserver) keeps the 
back-pressure plumbing local to this
+    // class. The wrapper delegates the data callbacks unchanged, and signals 
our `_readyCond` on stream close so a
+    // blocked sender wakes up to observe `_statusObserver.isFinished()` 
becoming true.
+    ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+        new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+          @Override
+          public void beforeStart(ClientCallStreamObserver<MailboxContent> 
requestStream) {
+            // Fires on a gRPC channel/Netty thread whenever isReady() 
transitions false -> true. Just signal; the
+            // sender re-checks the predicate after waking.
+            
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+          }
+
+          @Override
+          public void onNext(MailboxStatus value) {
+            _statusObserver.onNext(value);
+            // Only wake on receiver early-terminate. Transport-level 
isReady() transitions reach a parked
+            // sender through setOnReadyHandler (registered in beforeStart 
above); normal buffer-size ACKs
+            // do not change any predicate awaitReady() actually waits on, so 
signalling them would force a
+            // spurious park/unpark cycle on every receiver ACK. 
Early-terminate is the one status-only
+            // change (the stream stays open) that awaitReady() must observe 
promptly, so we still signal
+            // here when its metadata is set.
+            if (Boolean.parseBoolean(
+                
value.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
 {
+              wakeWaiters();
+            }
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            try {
+              _statusObserver.onError(t);
+            } finally {
+              wakeWaiters();
+            }
+          }
+
+          @Override
+          public void onCompleted() {
+            try {
+              _statusObserver.onCompleted();
+            } finally {
+              wakeWaiters();
+            }
+          }
+        };
+
+    return (ClientCallStreamObserver<MailboxContent>) PinotMailboxGrpc.newStub(
+            _channelManager.getChannel(_hostname, _port))
         .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
         .withDeadlineAfter(_deadlineMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS)
-        .open(_statusObserver);
+        .open(responseObserver);
   }
 
   protected void sendContent(ByteString byteString, boolean waitForMore) {
+    sendContent(byteString, waitForMore, false);
+  }
+
+  protected void sendContent(ByteString byteString, boolean waitForMore, 
boolean bypassReady) {
+    if (!awaitReady(bypassReady)) {
+      // Either the mailbox was cancelled while we were waiting (normal path) 
or the gRPC stream is already dead
+      // (bypass path). Either way, skip the send.
+      return;
+    }
+    // Narrow-window race mitigation: a concurrent cancel() may have run 
between awaitReady() returning true and
+    // here, setting _senderSideClosed and pushing its own error EOS. If we 
proceed, both threads would call
+    // onNext() on the same non-thread-safe ClientCallStreamObserver. 
Re-checking after the gate reduces (but
+    // does not fully eliminate) that window; fully eliminating it would 
require serializing all onNext() calls
+    // under _readyLock, which is more invasive. The bypass path 
(cancel/close) must push through regardless,
+    // so this guard only applies when bypassReady == false.
+    if (!bypassReady && isTerminated()) {

Review Comment:
   Fixed in `c39e12f1a4` by going with your "clean fix" — `_readyLock` (the 
existing `ReentrantLock` already used for `_readyCond`) now serializes 
**every** outbound observer call. Three sites are now lock-protected:
   
   1. `sendContent`: lock spans the post-`awaitReady` `isTerminated()` re-check 
and the `onNext`. `awaitReady` itself stays outside the lock — its slow path 
takes `_readyLock` internally for `_readyCond.await`, so wrapping it would just 
nest. The fast `isReady() == true` path remains lock-free.
   2. `send(MseBlock.Eos, …)`: lock spans `_senderSideClosed = true` + 
`onCompleted` so the success-path close is atomic against any racing 
`sendContent`.
   3. `cancel`: lock spans `processAndSend(errorBlock, bypassReady=true)` + 
`onCompleted`. The inner `sendContent` re-acquires via `ReentrantLock`'s 
same-thread reentry, no-op cost.
   
   The previous in-code paragraph that explained the narrow-window mitigation 
is gone — the race is no longer narrow, it's closed. Class-level Javadoc gained 
a one-liner: "_readyLock serializes every call to _contentObserver.onNext / 
onCompleted. Acquire it around any new outbound call site you add." so future 
maintainers don't accidentally add an unprotected fourth site.
   
   Rationale for picking the full fix over the "elevate the comment to 
class-level Javadoc and file a follow-up" alternative: this PR already paid the 
equivalent cost in `573356ee74` for the symmetric cancel-vs-EOS race. Leaving 
the data-path variant behind a re-check that visibly doesn't close it (and 
writing that fact down) felt like the wrong invariant to ship. The cost of the 
lock is one uncontended `lock()/unlock()` per chunk — tens of ns against 
`onNext`, no measurable bench impact.
   
   Tests: 40/40 mailbox tests still pass, including 
`testRemoteCancelledBySender*` in `GrpcSendingMailboxTest` (which is exactly 
the cancel-vs-send race scenario).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to