This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 481a5aa1e15 Remove non splitter grpc sender (#17003)
481a5aa1e15 is described below
commit 481a5aa1e155c571ee6c6466a954fd29d4e0c90d
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Oct 16 14:43:32 2025 +0200
Remove non splitter grpc sender (#17003)
---
.../tests/MultiStageEngineSmallBufferTest.java | 3 -
.../pinot/query/mailbox/GrpcSendingMailbox.java | 84 +++++++---------------
.../apache/pinot/query/mailbox/MailboxService.java | 27 +++----
.../mailbox/channel/MailboxContentObserver.java | 4 +-
.../query/service/dispatch/QueryDispatcher.java | 2 +
.../pinot/query/mailbox/MailboxServiceTest.java | 61 +++-------------
.../apache/pinot/spi/utils/CommonConstants.java | 13 ++--
7 files changed, 58 insertions(+), 136 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java
index 660f62126b3..e0c34c3a9f5 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineSmallBufferTest.java
@@ -37,7 +37,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT;
import static
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -125,13 +124,11 @@ public class MultiStageEngineSmallBufferTest extends
BaseClusterIntegrationTestS
@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
INBOUND_BLOCK_SIZE);
- brokerConf.setProperty(KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, true);
}
@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
serverConf.setProperty(KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
INBOUND_BLOCK_SIZE);
- serverConf.setProperty(KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT, true);
}
@Test(invocationCount = 50)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 50c4b1003eb..bdc9f5253a4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.datablock.DataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
@@ -68,19 +67,22 @@ public class GrpcSendingMailbox implements SendingMailbox {
private final long _deadlineMs;
private final StatMap<MailboxSendOperator.StatKey> _statMap;
private final MailboxStatusObserver _statusObserver = new
MailboxStatusObserver();
- private final Sender _sender;
+ private final int _maxByteStringSize;
private StreamObserver<MailboxContent> _contentObserver;
public GrpcSendingMailbox(String id, ChannelManager channelManager, String
hostname, int port, long deadlineMs,
- StatMap<MailboxSendOperator.StatKey> statMap, int maxByteStringSize) {
+ StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize)
{
_id = id;
_channelManager = channelManager;
_hostname = hostname;
_port = port;
_deadlineMs = deadlineMs;
_statMap = statMap;
- _sender = maxByteStringSize > 0 ? new SplitSender(this, maxByteStringSize)
: new NonSplitSender(this);
+ // TODO: tune the maxByteStringSize based on experiments. We know the
maxInboundMessageSize on the receiver side,
+ // but we want to leave some room for extra stuff for other fields like
metadata, mailbox id, etc, whose size
+ // we don't know at the time of writing into the stream as it is
serialized by protobuf.
+ _maxByteStringSize = Math.max(maxInboundMessageSize / 2, 1);
}
@Override
@@ -128,7 +130,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
long start = System.currentTimeMillis();
try {
DataBlock dataBlock = MseBlockSerializer.toDataBlock(block,
serializedStats);
- int sizeInBytes = _sender.processAndSend(dataBlock);
+ int sizeInBytes = processAndSend(dataBlock);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes);
}
@@ -138,6 +140,26 @@ public class GrpcSendingMailbox implements SendingMailbox {
}
}
+ /**
+ * Process the data block to split it into multiple ByteStrings that fit
into the maxByteStringSize, and send them
+ * one by one.
+ */
+ protected int processAndSend(DataBlock dataBlock)
+ throws IOException {
+ List<ByteString> byteStrings = toByteStrings(dataBlock,
_maxByteStringSize);
+ int sizeInBytes = 0;
+ for (ByteString byteString : byteStrings) {
+ sizeInBytes += byteString.size();
+ }
+ Iterator<ByteString> byteStringIt = byteStrings.iterator();
+ while (byteStringIt.hasNext()) {
+ ByteString byteString = byteStringIt.next();
+ boolean waitForMore = byteStringIt.hasNext();
+ sendContent(byteString, waitForMore);
+ }
+ return sizeInBytes;
+ }
+
@Override
public void cancel(Throwable t) {
if (isTerminated()) {
@@ -292,56 +314,4 @@ public class GrpcSendingMailbox implements SendingMailbox {
return result;
}
-
- private static abstract class Sender {
- protected final GrpcSendingMailbox _mailbox;
-
- protected Sender(GrpcSendingMailbox mailbox) {
- _mailbox = mailbox;
- }
-
- protected abstract int processAndSend(DataBlock dataBlock)
- throws IOException;
- }
-
- private static class SplitSender extends Sender {
- private final int _maxByteStringSize;
-
- public SplitSender(GrpcSendingMailbox mailbox, int maxByteStringSize) {
- super(mailbox);
- _maxByteStringSize = maxByteStringSize;
- }
-
- @Override
- protected int processAndSend(DataBlock dataBlock)
- throws IOException {
- List<ByteString> byteStrings = toByteStrings(dataBlock,
_maxByteStringSize);
- int sizeInBytes = 0;
- for (ByteString byteString : byteStrings) {
- sizeInBytes += byteString.size();
- }
- Iterator<ByteString> byteStringIt = byteStrings.iterator();
- while (byteStringIt.hasNext()) {
- ByteString byteString = byteStringIt.next();
- boolean waitForMore = byteStringIt.hasNext();
- _mailbox.sendContent(byteString, waitForMore);
- }
- return sizeInBytes;
- }
- }
-
- private static class NonSplitSender extends Sender {
- public NonSplitSender(GrpcSendingMailbox mailbox) {
- super(mailbox);
- }
-
- @Override
- protected int processAndSend(DataBlock dataBlock)
- throws IOException {
- ByteString byteString = DataBlockUtils.toByteString(dataBlock);
- int sizeInBytes = byteString.size();
- _mailbox.sendContent(byteString, false);
- return sizeInBytes;
- }
- }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 3a58293367e..1a330eb447a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -73,7 +73,17 @@ public class MailboxService {
private final ChannelManager _channelManager;
@Nullable private final TlsConfig _tlsConfig;
@Nullable private final QueryAccessControlFactory _accessControlFactory;
- private final int _maxByteStringSize;
+ /**
+ * The max inbound message size for the gRPC server.
+ *
+ * If we try to send a message larger than this value, the gRPC server will
throw an exception and close the
+ * connection.
+ *
+ * The {@link GrpcSendingMailbox} will split the data block into smaller
chunks to fit into this limit, but a very
+ * small limit (lower than hundred of KBs) will cause performance
degradation and may even fail, given that some extra
+ * bloating is added by gRPC and protobuf.
+ */
+ private final int _maxInboundMessageSize;
private GrpcMailboxServer _grpcMailboxServer;
@@ -93,21 +103,12 @@ public class MailboxService {
_instanceType = instanceType;
_config = config;
_tlsConfig = tlsConfig;
- int maxInboundMessageSize = config.getProperty(
+ _maxInboundMessageSize = config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
);
- _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize,
getIdleTimeout(config));
+ _channelManager = new ChannelManager(tlsConfig, _maxInboundMessageSize,
getIdleTimeout(config));
_accessControlFactory = accessControlFactory;
- boolean splitBlocks = config.getProperty(
-
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
-
CommonConstants.MultiStageQueryRunner.DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT);
- if (splitBlocks) {
- // so far we ensure payload is not bigger than maxBlockSize/2, we can
fine tune this later
- _maxByteStringSize = Math.max(maxInboundMessageSize / 2, 1);
- } else {
- _maxByteStringSize = 0;
- }
LOGGER.info("Initialized MailboxService with hostname: {}, port: {}",
hostname, port);
}
@@ -151,7 +152,7 @@ public class MailboxService {
return new InMemorySendingMailbox(mailboxId, this, deadlineMs, statMap);
} else {
return new GrpcSendingMailbox(mailboxId, _channelManager, hostname,
port, deadlineMs, statMap,
- _maxByteStringSize);
+ _maxInboundMessageSize);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index a37f93598f3..0efcd22020a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -128,8 +128,8 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
_mailboxBuffers.clear();
if (_mailbox != null) {
String msg = t != null ? t.getMessage() : "Unknown";
- _mailbox.setErrorBlock(ErrorMseBlock.fromError(
- QueryErrorCode.QUERY_CANCELLATION, "Cancelled by sender with
exception: " + msg), List.of());
+ String errorMessage = "GRPC mailbox cancelled by sender with exception:
" + msg;
+ _mailbox.setErrorBlock(ErrorMseBlock.fromError(QueryErrorCode.INTERNAL,
errorMessage), List.of());
} else {
LOGGER.error("Got error before mailbox is set up", t);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 84a8ac6f151..0d183d78a36 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -168,6 +168,7 @@ public class QueryDispatcher {
submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
QueryResult result = runReducer(dispatchableSubPlan, queryOptions,
_mailboxService);
if (result.getProcessingException() != null) {
+ LOGGER.warn("Query result includes processing exceptions. Trying to
cancel the other opchains");
MultiStageQueryStats statsFromCancel = cancelWithStats(requestId,
servers);
cancelled = true;
return result.withStats(statsFromCancel);
@@ -207,6 +208,7 @@ public class QueryDispatcher {
}
// in case of known exceptions (timeout or query exception), we need can
build here the erroneous QueryResult
// that include the stats.
+ LOGGER.warn("Query failed with a known exception. Trying to cancel the
other opchains");
MultiStageQueryStats stats = cancelWithStats(requestId, servers);
if (stats == null) {
throw ex;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index 40049f513f2..ddbd5075ad8 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -21,13 +21,13 @@ package org.apache.pinot.query.mailbox;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -36,7 +36,6 @@ import
org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
@@ -387,7 +386,7 @@ public class MailboxServiceTest {
sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
// Wait until all the mails are delivered
- receiveMailLatch.await();
+ assertTrue(receiveMailLatch.await(10000, TimeUnit.MILLISECONDS), "Timed
out waiting for mailbox to receive");
assertEquals(numCallbacks.get(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS);
for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
@@ -422,7 +421,7 @@ public class MailboxServiceTest {
sendingMailbox.cancel(new Exception("TEST ERROR"));
// Wait until all the mails are delivered
- receiveMailLatch.await();
+ assertTrue(receiveMailLatch.await(10000, TimeUnit.MILLISECONDS), "Timed
out waiting for mailbox to receive");
assertEquals(numCallbacks.get(), 2);
// Data blocks will be cleaned up
@@ -456,7 +455,7 @@ public class MailboxServiceTest {
sendingMailbox.cancel(new Exception("TEST ERROR"));
// Wait until cancellation is delivered
- receiveMailLatch.await();
+ assertTrue(receiveMailLatch.await(10000, TimeUnit.MILLISECONDS), "Timed
out waiting for mailbox to receive");
assertEquals(numCallbacks.get(), 1);
// Data blocks will be cleaned up
@@ -472,51 +471,6 @@ public class MailboxServiceTest {
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
}
- @Test
- public void testRemoteCancelledBecauseResourceExhausted()
- throws Exception {
- PinotConfiguration config =
- new
PinotConfiguration(Map.of(MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
1));
- MailboxService mailboxService3 =
- new MailboxService("localhost", QueryTestUtils.getAvailablePort(),
InstanceType.BROKER, config);
- mailboxService3.start();
- MailboxService mailboxService4 =
- new MailboxService("localhost", QueryTestUtils.getAvailablePort(),
InstanceType.SERVER, config);
- mailboxService4.start();
-
- String mailboxId = MailboxIdUtils.toMailboxId(_requestId++,
SENDER_STAGE_ID, 0, RECEIVER_STAGE_ID, 0);
- SendingMailbox sendingMailbox =
- mailboxService4.getSendingMailbox("localhost",
mailboxService3.getPort(), mailboxId, Long.MAX_VALUE, _stats);
- ReceivingMailbox receivingMailbox =
mailboxService3.getReceivingMailbox(mailboxId);
- AtomicInteger numCallbacks = new AtomicInteger();
- CountDownLatch receiveMailLatch = new CountDownLatch(1);
- receivingMailbox.registeredReader(() -> {
- numCallbacks.getAndIncrement();
- receiveMailLatch.countDown();
- });
-
- // Send some large data
- sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{"longer-amount-of-data-than-server-expects"}));
-
- // Wait until cancellation is delivered
- receiveMailLatch.await();
- assertEquals(numCallbacks.get(), 1);
-
- // Assert that error block is returned from server.
- assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
- MseBlock block = readBlock(receivingMailbox);
- assertNotNull(block);
- assertTrue(block.isError());
-
- assertTrue(block instanceof ErrorMseBlock);
- ErrorMseBlock errorMseBlock = (ErrorMseBlock) block;
-
assertEquals(errorMseBlock.getErrorMessages().get(QueryErrorCode.QUERY_CANCELLATION),
- "Cancelled by sender with exception: CANCELLED: client cancelled");
-
- mailboxService3.shutdown();
- mailboxService4.shutdown();
- }
-
@Test
public void testRemoteCancelledByReceiver()
throws Exception {
@@ -533,7 +487,7 @@ public class MailboxServiceTest {
// Send one data block and then cancel
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{"0"}));
- receiveMailLatch.await();
+ assertTrue(receiveMailLatch.await(10000, TimeUnit.MILLISECONDS), "Timed
out waiting for mailbox to receive");
receivingMailbox.cancel();
assertEquals(numCallbacks.get(), 2);
@@ -567,7 +521,8 @@ public class MailboxServiceTest {
// Send one data block, receiver will time out after deadline
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{"0"}));
- receiveMailLatch.await();
+ Thread.sleep(deadlineMs - System.currentTimeMillis() + 10);
+ assertTrue(receiveMailLatch.await(10000, TimeUnit.MILLISECONDS), "Timed
out waiting for mailbox to receive");
assertEquals(numCallbacks.get(), 2);
// Data blocks will be cleaned up
@@ -609,7 +564,7 @@ public class MailboxServiceTest {
// Next send will be blocked on the receiver side and cause exception
after timeout
// We need to send a data block, given we don't block on EOS
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{"0"}));
- receiveMailLatch.await();
+ assertTrue(receiveMailLatch.await(10000, TimeUnit.MILLISECONDS), "Timed
out waiting for mailbox to receive");
assertEquals(numCallbacks.get(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS + 1);
// Data blocks will be cleaned up
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 00102c90a28..8d6e4012ee8 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1892,7 +1892,11 @@ public class CommonConstants {
*/
public static class MultiStageQueryRunner {
/**
- * Configuration for mailbox data block size
+ * Configuration for mailbox data block size.
+ *
+ * Ideally it should be in the order of a few MBs, to balance the
serialization/deserialization overhead and the
+ * number of messages to transfer. Values lower than hundreds of KBs are
not recommended and may lead to excessive
+ * number of messages, overhead and even errors.
*/
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
= "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES =
16 * 1024 * 1024;
@@ -1913,13 +1917,6 @@ public class CommonConstants {
public static final String KEY_OF_CHANNEL_IDLE_TIMEOUT_SECONDS =
"pinot.query.runner.channel.idle.timeout.seconds";
public static final long DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS = -1;
- /**
- * Enable splitting of data block payload during mailbox transfer.
- */
- public static final String KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT =
- "pinot.query.runner.enable.data.block.payload.split";
- public static final boolean DEFAULT_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT =
false;
-
/// Configuration for server port used to receive query plans.
public static final String KEY_OF_QUERY_SERVER_PORT =
"pinot.query.server.port";
public static final int DEFAULT_QUERY_SERVER_PORT = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]