This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d49bd3c96e0 Remove SendingMailbox.complete method and add that logic
when EOS is sent (#16915)
d49bd3c96e0 is described below
commit d49bd3c96e09123918c609c0a2f9a2242576744a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Oct 8 17:52:39 2025 +0200
Remove SendingMailbox.complete method and add that logic when EOS is sent
(#16915)
---
.../pinot/query/mailbox/GrpcSendingMailbox.java | 12 +----
.../query/mailbox/InMemorySendingMailbox.java | 6 +--
.../apache/pinot/query/mailbox/SendingMailbox.java | 26 +++++-----
.../runtime/operator/exchange/BlockExchange.java | 58 +++++++++++++---------
.../pinot/query/mailbox/MailboxServiceTest.java | 5 --
.../operator/exchange/BlockExchangeTest.java | 7 ++-
6 files changed, 52 insertions(+), 62 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index cce43b228f7..4ffdd58e684 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -98,6 +98,8 @@ public class GrpcSendingMailbox implements SendingMailbox {
public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
throws IOException, TimeoutException {
sendInternal(block, serializedStats);
+ LOGGER.debug("Completing mailbox: {}", _id);
+ _contentObserver.onCompleted();
}
private void sendInternal(MseBlock block, List<DataBuffer> serializedStats)
@@ -140,16 +142,6 @@ public class GrpcSendingMailbox implements SendingMailbox {
}
}
- @Override
- public void complete() {
- if (isTerminated()) {
- LOGGER.debug("Already terminated mailbox: {}", _id);
- return;
- }
- LOGGER.debug("Completing mailbox: {}", _id);
- _contentObserver.onCompleted();
- }
-
@Override
public void cancel(Throwable t) {
if (isTerminated()) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 43cfb7bf9fe..b566100f276 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -69,6 +69,7 @@ public class InMemorySendingMailbox implements SendingMailbox
{
public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
throws IOException, TimeoutException {
sendPrivate(block, serializedStats);
+ _isTerminated = true;
}
private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
@@ -102,11 +103,6 @@ public class InMemorySendingMailbox implements
SendingMailbox {
}
}
- @Override
- public void complete() {
- _isTerminated = true;
- }
-
@Override
public void cancel(Throwable t) {
if (_isTerminated) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index a3f37007670..d11329f112b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -28,6 +28,18 @@ import org.apache.pinot.segment.spi.memory.DataBuffer;
/**
* Mailbox that's used to send data.
+ *
+ * Usages of this interface should follow the pattern:
+ *
+ * <ol>
+ * <li>Zero or more calls to {@link #send(MseBlock.Data)}</li>
+ * <li>Then exactly one of:
+ * <ul>
+ * <li>One call to {@link #send(MseBlock.Eos, List)} if the receiver is
not early terminated</li>
+ * <li>One call to {@link #cancel(Throwable)} if the sender wants to
cancel the receiver</li>
+ * </ul>
+ * </li>
+ * </ol>
*/
public interface SendingMailbox {
@@ -53,20 +65,6 @@ public interface SendingMailbox {
void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
throws IOException, TimeoutException;
- /**
- * Called when there is no more data to be sent by the {@link
BlockExchange}. This is also a signal for the
- * SendingMailbox that the sender is done sending data from its end. Note
that this doesn't mean that the receiver
- * has received all the data.
- *
- * <p>
- * <b>Note:</b> While this is similar to a close() method that's usually
provided with objects that hold releasable
- * resources, the key difference is that a SendingMailbox cannot completely
release the resources on its end
- * gracefully, since it would be waiting for the receiver to ack that it has
received all the data. See
- * {@link #cancel} which can allow callers to force release the underlying
resources.
- * </p>
- */
- void complete();
-
/**
* Cancels the mailbox and notifies the receiver of the cancellation so that
it can release the underlying resources.
* No more blocks can be sent after calling this method.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index ccc170fad27..261c032e965 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -127,7 +127,6 @@ public abstract class BlockExchange {
*/
public boolean send(MseBlock.Eos eosBlock, List<DataBuffer> serializedStats)
throws IOException, TimeoutException {
- int numMailboxes = _sendingMailboxes.size();
int mailboxIdToSendMetadata;
if (!serializedStats.isEmpty()) {
mailboxIdToSendMetadata = _statsIndexChooser.apply(_sendingMailboxes);
@@ -139,14 +138,36 @@ public abstract class BlockExchange {
// this may happen when the block exchange is itself used as a sending
mailbox, like when using spools
mailboxIdToSendMetadata = -1;
}
+ Exception firstException = null;
+ int numMailboxes = _sendingMailboxes.size();
for (int i = 0; i < numMailboxes; i++) {
- SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
- List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
+ try {
+ SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+ List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
- sendingMailbox.send(eosBlock, statsToSend);
- sendingMailbox.complete();
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ sendingMailbox.send(eosBlock, statsToSend);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ }
+ } catch (IOException | TimeoutException | RuntimeException e) {
+ // We want to try to send EOS to all mailboxes, so we catch the
exception and rethrow it at the end.
+ if (firstException == null) {
+ firstException = e;
+ } else {
+ firstException.addSuppressed(e);
+ }
+ }
+ }
+ if (firstException != null) {
+ // This is ugly, but necessary to be sure we throw the right exception,
which is later caught by the
+ // QueryRunner and handled properly.
+ if (firstException instanceof IOException) {
+ throw (IOException) firstException;
+ } else if (firstException instanceof TimeoutException) {
+ throw (TimeoutException) firstException;
+ } else {
+ Preconditions.checkState(firstException instanceof RuntimeException);
+ throw (RuntimeException) firstException;
}
}
return false;
@@ -218,30 +239,19 @@ public abstract class BlockExchange {
@Override
public void send(MseBlock.Data data)
throws IOException, TimeoutException {
- sendPrivate(data, Collections.emptyList());
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Exchange mailbox {} echoing data block {} {}", this,
data, System.identityHashCode(data));
+ }
+ _earlyTerminated = BlockExchange.this.send(data);
}
@Override
public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
throws IOException, TimeoutException {
- sendPrivate(block, serializedStats);
- }
-
- private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Exchange mailbox {} echoing {} {}", this, block,
System.identityHashCode(block));
- }
- if (block.isData()) {
- Preconditions.checkArgument(serializedStats.isEmpty(), "Data block
cannot have stats");
- _earlyTerminated = BlockExchange.this.send(((MseBlock.Data) block));
- } else {
- _earlyTerminated = BlockExchange.this.send(((MseBlock.Eos) block),
serializedStats);
+ LOGGER.trace("Exchange mailbox {} echoing EOS block {} {}", this,
block, System.identityHashCode(block));
}
- }
-
- @Override
- public void complete() {
+ _earlyTerminated = BlockExchange.this.send(block, serializedStats);
_completed = true;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index b9bd4d3cb4b..f5f1544e31e 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -92,7 +92,6 @@ public class MailboxServiceTest {
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{Integer.toString(i)}));
}
sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
- sendingMailbox.complete();
ReceivingMailbox receivingMailbox =
_mailboxService1.getReceivingMailbox(mailboxId);
receivingMailbox.registeredReader(() -> {
@@ -129,7 +128,6 @@ public class MailboxServiceTest {
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{Integer.toString(i)}));
}
sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
- sendingMailbox.complete();
assertEquals(numCallbacks.get(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS);
@@ -342,7 +340,6 @@ public class MailboxServiceTest {
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{Integer.toString(i)}));
}
sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
- sendingMailbox.complete();
// Wait until all the mails are delivered
ReceivingMailbox receivingMailbox =
_mailboxService1.getReceivingMailbox(mailboxId);
@@ -388,7 +385,6 @@ public class MailboxServiceTest {
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{Integer.toString(i)}));
}
sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
- sendingMailbox.complete();
// Wait until all the mails are delivered
receiveMailLatch.await();
@@ -669,7 +665,6 @@ public class MailboxServiceTest {
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{"0"}));
// send a metadata block
sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
- sendingMailbox.complete();
// sending side should early terminate
TestUtils.waitForCondition(aVoid -> sendingMailbox.isEarlyTerminated(),
1000L, "Failed to early-terminate sender");
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index e1fb8ab68eb..4ea75fcad74 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
@@ -71,16 +70,16 @@ public class BlockExchangeTest {
BlockExchange exchange = new TestBlockExchange(destinations);
// When:
- exchange.send(SuccessMseBlock.INSTANCE, Collections.emptyList());
+ exchange.send(SuccessMseBlock.INSTANCE, List.of());
// Then:
ArgumentCaptor<MseBlock.Eos> captor =
ArgumentCaptor.forClass(MseBlock.Eos.class);
- Mockito.verify(_mailbox1).complete();
+ Mockito.verify(_mailbox1).send(SuccessMseBlock.INSTANCE, List.of());
Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture(),
anyList());
Assert.assertTrue(captor.getValue().isEos());
- Mockito.verify(_mailbox2).complete();
+ Mockito.verify(_mailbox2).send(SuccessMseBlock.INSTANCE, List.of());
Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture(),
anyList());
Assert.assertTrue(captor.getValue().isEos());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]