walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177074193
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -18,55 +18,118 @@
*/
package org.apache.pinot.query.mailbox;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import javax.annotation.Nullable;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Mailbox that's used to receive data. Ownership of the ReceivingMailbox is
with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the {@link
MailboxSendOperator}. This is because the
- * ReceivingMailbox can be initialized even before the corresponding OpChain
is registered on the receiver, whereas
- * the SendingMailbox is initialized when the MailboxSendOperator is running.
Also see {@link #isInitialized()}.
- *
- * @param <T> the unit of data that each {@link #receive()} call returns.
+ * the {@link SendingMailbox} whose ownership lies with the send operator.
This is because the ReceivingMailbox can be
+ * initialized even before the corresponding OpChain is registered on the
receiver, whereas the SendingMailbox is
+ * initialized when the send operator is running.
*/
-public interface ReceivingMailbox<T> {
+public class ReceivingMailbox {
+ public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ReceivingMailbox.class);
+ private static final TransferableBlock CANCELLED_ERROR_BLOCK =
+ TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException("Cancelled by receiver"));
+
+ private final String _id;
+ private final Consumer<String> _receiveMailCallback;
+ // TODO: Make the queue size configurable
+ // TODO: Revisit if this is the correct way to apply back pressure
+ private final BlockingQueue<TransferableBlock> _blocks = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
+ private final AtomicReference<TransferableBlock> _errorBlock = new
AtomicReference<>();
+
+ public ReceivingMailbox(String id, Consumer<String> receiveMailCallback) {
+ _id = id;
+ _receiveMailCallback = receiveMailCallback;
+ }
- MailboxIdentifier getId();
+ public String getId() {
+ return _id;
+ }
/**
- * Returns a unit of data. Implementations are allowed to return null, in
which case {@link MailboxReceiveOperator}
- * will assume that this mailbox doesn't have any data to return and it will
instead poll the other mailbox (if any).
+ * Offers a non-error block into the mailbox within the timeout specified,
returns whether the block is successfully
+ * added. If the block is not added, an error block is added to the mailbox.
*/
- @Nullable
- T receive() throws Exception;
+ public boolean offer(TransferableBlock block, long timeoutMs) {
+ if (_errorBlock.get() != null) {
+ LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring
the late block", _id);
+ return false;
+ }
+ if (timeoutMs < 0) {
+ LOGGER.debug("Mailbox: {} is already timed out", _id);
+ setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
+ new TimeoutException("Timed out while offering data to mailbox: " +
_id)));
+ return false;
+ }
+ try {
+ if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
+ if (_errorBlock.get() == null) {
+ _receiveMailCallback.accept(_id);
+ return true;
+ } else {
+ LOGGER.debug("Mailbox: {} is already cancelled or errored out,
ignoring the late block", _id);
+ _blocks.clear();
+ return false;
+ }
+ } else {
+ LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms",
_id, timeoutMs);
+ setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
+ new TimeoutException("Timed out while waiting for receive operator
to consume data from mailbox: " + _id)));
+ return false;
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
+ setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e));
+ return false;
+ }
+ }
/**
- * A ReceivingMailbox is considered initialized when it has a reference to
the underlying channel used for receiving
- * the data. The underlying channel may be a gRPC stream, in-memory queue,
etc. Once a receiving mailbox is
- * initialized, it has the ability to close the underlying channel via the
{@link #cancel()} method.
+ * Sets an error block into the mailbox. No more blocks are accepted after
calling this method.
*/
- boolean isInitialized();
+ public void setErrorBlock(TransferableBlock errorBlock) {
+ if (_errorBlock.compareAndSet(null, errorBlock)) {
+ _blocks.clear();
+ _receiveMailCallback.accept(_id);
+ }
+ }
/**
- * A ReceivingMailbox is considered closed if it has sent all the data to
the receiver and doesn't have any more data
- * to send.
+ * Returns the first block from the mailbox, or {@code null} if there is no
block received yet. Error block is
+ * returned if exists.
*/
- boolean isClosed();
+ @Nullable
+ public TransferableBlock poll() {
+ TransferableBlock errorBlock = _errorBlock.get();
+ return errorBlock != null ? errorBlock : _blocks.poll();
+ }
/**
- * A ReceivingMailbox may hold a reference to the underlying channel.
Usually the channel would be automatically
- * closed once all the data has been received by the receiver, and in such
cases {@link #isClosed()} returns true.
- * However in failure scenarios the underlying channel may not be released,
and the receiver can use this method to
- * ensure the same.
- *
- * This API should ensure that the underlying channel is "released" if it
hasn't been already. If the channel has
- * already been released, the API shouldn't throw and instead return
gracefully.
- *
- * <p>
- * This method may be called multiple times, so implementations should
ensure this is idempotent.
- * </p>
+ * Cancels the mailbox. No more blocks are accepted after calling this
method. Should only be called by the receive
+ * operator to clean up the remaining blocks.
*/
- void cancel();
+ public void cancel() {
+ LOGGER.debug("Cancelling mailbox: {}", _id);
+ if (_errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) {
+ _blocks.clear();
+ }
+ }
Review Comment:
should accept upstream throwable here
```suggestion
public void cancel(Throwable t) {
LOGGER.debug("Cancelling mailbox: {}", _id, t);
if (_errorBlock.compareAndSet(null,
TransferableBlockUtils.getErrorTransferableBlock(t))) {
_blocks.clear();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]