gortiz commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3266798239
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -208,17 +263,65 @@ public boolean isTerminated() {
return _senderSideClosed || _statusObserver.isFinished();
}
- private StreamObserver<MailboxContent> getContentObserver() {
+ private ClientCallStreamObserver<MailboxContent> getContentObserver() {
Metadata metadata = new Metadata();
metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
- return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname,
_port))
+ // We wrap `_statusObserver` in a ClientResponseObserver so we can
register the on-ready handler through
+ // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after
open() returns. Wrapping (rather than
+ // making MailboxStatusObserver itself a ClientResponseObserver) keeps the
back-pressure plumbing local to this
+ // class. The wrapper delegates the data callbacks unchanged, and signals
our `_readyCond` on stream close so a
+ // blocked sender wakes up to observe `_statusObserver.isFinished()`
becoming true.
+ ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+ new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<MailboxContent>
requestStream) {
+ // Fires on a gRPC channel/Netty thread whenever isReady()
transitions false -> true. Just signal; the
+ // sender re-checks the predicate after waking.
+
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+ }
+
+ @Override
+ public void onNext(MailboxStatus value) {
+ _statusObserver.onNext(value);
Review Comment:
Fixed in commit `db9f04549c` ("Only wake the sender on receiver
early-terminate, not on every ACK"). The wrapper's `onNext` now calls
`wakeWaiters()` specifically when the incoming `MailboxStatus` carries
`MAILBOX_METADATA_REQUEST_EARLY_TERMINATE` (`GrpcSendingMailbox.java:297-299`).
I kept it scoped to early-terminate rather than every `onNext` because the
receiver's buffer-size ACKs don't change any predicate `awaitReady` waits on
(transport-readiness reaches the parked sender via `setOnReadyHandler`,
registered in `beforeStart`), so signalling on every ACK would force a spurious
park/unpark on each one. Early-terminate is the one status-only flip (stream
stays open) where the application gate needs to observe the change promptly.
There's a comment at that site explaining the choice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]