gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1280337032
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -51,16 +51,30 @@ public class ReceivingMailbox {
// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new
AtomicReference<>();
+ @Nullable
+ private volatile Reader _reader;
public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) {
_id = id;
_receiveMailCallback = receiveMailCallback;
}
+ public void registeredReader(Reader reader) {
Review Comment:
I've applied your technique of using blocking queues instead of locks. I'm
not 100% sure if that is needed, but I'm ok to use that for now.
The race condition wasn't actually here but in BlockigToAsyncStream. There,
I notified the reader just before finishing the completable future, so it could
happen that the reader started to look for blocks before the completable future
was finished.
The solution is super simple: The main completable future should fetch from
the blocking source and return its value without notifying. Then, a second
runnable that notifies the reader should be executed once the main one
finished. By doing that there is a happen before between the reader and the
main completable future. Test seem to be consistently working on my computer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]