This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0be79c9b37 [multistage] adding error block bubble up for GRPC
exceptions (#9143)
0be79c9b37 is described below
commit 0be79c9b370dc0af05eae0ab9b21a5867b06efba
Author: Rong Rong <[email protected]>
AuthorDate: Tue Aug 2 20:41:15 2022 -0700
[multistage] adding error block bubble up for GRPC exceptions (#9143)
* adding error block bubble up for GRPC exceptions
* adding configuration for max inbound message size so that test properly
throw exception now
Co-authored-by: Rong Rong <[email protected]>
---
.../MultiStageBrokerRequestHandler.java | 2 +-
.../pinot/query/mailbox/GrpcMailboxService.java | 5 ++-
.../pinot/query/mailbox/GrpcSendingMailbox.java | 6 +++
.../query/mailbox/channel/ChannelManager.java | 5 ++-
.../pinot/query/mailbox/channel/ChannelUtils.java | 1 +
.../query/mailbox/channel/GrpcMailboxServer.java | 10 +++--
.../channel/MailboxContentStreamObserver.java | 48 ++++++++++++++--------
.../apache/pinot/query/runtime/QueryRunner.java | 2 +-
.../runtime/operator/MailboxReceiveOperator.java | 7 ++--
.../apache/pinot/query/service/QueryConfig.java | 4 +-
.../query/mailbox/GrpcMailboxServiceTest.java | 39 ++++++++++++++++++
.../query/mailbox/GrpcMailboxServiceTestBase.java | 7 +++-
.../pinot/query/runtime/QueryRunnerTestBase.java | 3 +-
13 files changed, 107 insertions(+), 32 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d7b7a1c802..43adfd6045 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -90,7 +90,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
new WorkerManager(_reducerHostname, _reducerPort, routingManager));
_queryDispatcher = new QueryDispatcher();
- _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort);
+ _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort,
config);
// TODO: move this to a startUp() function.
_mailboxService.start();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index 30f0518a19..c4d81fd76a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -22,6 +22,7 @@ import io.grpc.ManagedChannel;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
/**
@@ -53,10 +54,10 @@ public class GrpcMailboxService implements
MailboxService<MailboxContent> {
private final ConcurrentHashMap<String, SendingMailbox<MailboxContent>>
_sendingMailboxMap =
new ConcurrentHashMap<>();
- public GrpcMailboxService(String hostname, int mailboxPort) {
+ public GrpcMailboxService(String hostname, int mailboxPort,
PinotConfiguration extraConfig) {
_hostname = hostname;
_mailboxPort = mailboxPort;
- _channelManager = new ChannelManager(this);
+ _channelManager = new ChannelManager(this, extraConfig);
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 2aff8a03e3..506d66e01c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
+import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
/**
@@ -48,6 +49,11 @@ public class GrpcSendingMailbox implements
SendingMailbox<MailboxContent> {
PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
_statusStreamObserver = new MailboxStatusStreamObserver();
_statusStreamObserver.init(stub.open(_statusStreamObserver));
+ // send a begin-of-stream message.
+ _statusStreamObserver.send(MailboxContent.newBuilder()
+ .setMailboxId(_mailboxId)
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true")
+ .build());
_initialized.set(true);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index 72d90c07ac..006442cf1c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -23,6 +23,7 @@ import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
/**
@@ -40,9 +41,9 @@ public class ChannelManager {
private final ConcurrentHashMap<String, ManagedChannel> _channelMap = new
ConcurrentHashMap<>();
- public ChannelManager(GrpcMailboxService mailboxService) {
+ public ChannelManager(GrpcMailboxService mailboxService, PinotConfiguration
extraConfig) {
_mailboxService = mailboxService;
- _grpcMailboxServer = new GrpcMailboxServer(_mailboxService,
_mailboxService.getMailboxPort());
+ _grpcMailboxServer = new GrpcMailboxServer(_mailboxService,
_mailboxService.getMailboxPort(), extraConfig);
}
public void init() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
index 1662047263..c9ecade883 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.mailbox.channel;
public class ChannelUtils {
public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size";
+ public static final String MAILBOX_METADATA_BEGIN_OF_STREAM_KEY =
"begin.of.stream";
public static final String MAILBOX_METADATA_END_OF_STREAM_KEY =
"end.of.stream";
private ChannelUtils() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 2a6706b709..0b3ec4a8ea 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,10 +45,13 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
private final GrpcMailboxService _mailboxService;
private final Server _server;
- public GrpcMailboxServer(GrpcMailboxService mailboxService, int port) {
+ public GrpcMailboxServer(GrpcMailboxService mailboxService, int port,
PinotConfiguration extraConfig) {
_mailboxService = mailboxService;
- _server = ServerBuilder.forPort(port).addService(this)
-
.maxInboundMessageSize(QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE).build();
+ _server = ServerBuilder.forPort(port)
+ .addService(this)
+
.maxInboundMessageSize(extraConfig.getProperty(QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE,
+ QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE))
+ .build();
LOGGER.info("Initialized GrpcMailboxServer on port: {}", port);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 5b79549a27..72deae4976 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -18,13 +18,16 @@
*/
package org.apache.pinot.query.mailbox.channel;
+import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
+import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +48,7 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
private final boolean _isEnabledFeedback;
private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
+ private String _mailboxId;
private ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
@@ -80,30 +84,40 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
@Override
public void onNext(Mailbox.MailboxContent mailboxContent) {
- GrpcReceivingMailbox receivingMailbox =
- (GrpcReceivingMailbox)
_mailboxService.getReceivingMailbox(mailboxContent.getMailboxId());
+ _mailboxId = mailboxContent.getMailboxId();
+ GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox)
_mailboxService.getReceivingMailbox(_mailboxId);
receivingMailbox.init(this);
- // when the receiving end receives a message put it in the mailbox queue.
- _receivingBuffer.offer(mailboxContent);
- if (_isEnabledFeedback) {
- // TODO: this has race conditions with onCompleted() because sender
blindly closes connection channels once
- // it has finished sending all the data packets.
- int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
- Mailbox.MailboxStatus.Builder builder =
-
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
- .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
String.valueOf(remainingCapacity));
- if
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
!= null) {
- builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true");
+ if
(!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY))
{
+ // when the receiving end receives a message put it in the mailbox queue.
+ _receivingBuffer.offer(mailboxContent);
+ if (_isEnabledFeedback) {
+ // TODO: this has race conditions with onCompleted() because sender
blindly closes connection channels once
+ // it has finished sending all the data packets.
+ int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
+ Mailbox.MailboxStatus.Builder builder =
+
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
String.valueOf(remainingCapacity));
+ if
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
!= null) {
+ builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true");
+ }
+ Mailbox.MailboxStatus status = builder.build();
+ // returns the buffer available size to sender for rate controller /
throttling.
+ _responseObserver.onNext(status);
}
- Mailbox.MailboxStatus status = builder.build();
- // returns the buffer available size to sender for rate controller /
throttling.
- _responseObserver.onNext(status);
}
}
@Override
public void onError(Throwable e) {
- throw new RuntimeException(e);
+ try {
+ _receivingBuffer.offer(Mailbox.MailboxContent.newBuilder()
+ .setPayload(ByteString.copyFrom(
+ TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException(e)).toBytes()))
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true").build());
+ throw new RuntimeException(e);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unable to encode exception for cascade
reporting: " + e, ioe);
+ }
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 2f7f23f264..bf53684d32 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -71,7 +71,7 @@ public class QueryRunner {
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT,
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
try {
- _mailboxService = new GrpcMailboxService(_hostname, _port);
+ _mailboxService = new GrpcMailboxService(_hostname, _port, config);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config, instanceDataManager, serverMetrics);
_workerExecutor = new WorkerQueryExecutor();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 92309f946b..91d5de3d8d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
@@ -127,10 +128,10 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
}
if (System.nanoTime() >= timeoutWatermark) {
LOGGER.error("Timed out after polling mailboxes: {}",
_sendingStageInstances);
+ return
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+ } else {
+ return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
}
- // TODO: we need to at least return one data table with schema if there's
no error.
- // we need to condition this on whether there's already things being
returned or not.
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
}
public RelDistribution.Type getExchangeType() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index f2a3d12d14..2b90c93f4c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -22,9 +22,11 @@ package org.apache.pinot.query.service;
* Configuration for setting up query runtime.
*/
public class QueryConfig {
- public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE =
128 * 1024 * 1024;
public static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
+ public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE =
"pinot.query.runner.max.msg.size";
+ public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE =
128 * 1024 * 1024;
+
public static final String KEY_OF_QUERY_SERVER_PORT =
"pinot.query.server.port";
public static final int DEFAULT_QUERY_SERVER_PORT = 0;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 5661d9cae7..9ba1b52ddc 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -22,10 +22,13 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.util.TestUtils;
@@ -65,6 +68,34 @@ public class GrpcMailboxServiceTest extends
GrpcMailboxServiceTestBase {
}, 5000L, "Receiving mailbox is not closed properly!");
}
+ @Test
+ public void testGrpcException()
+ throws Exception {
+ Preconditions.checkState(_mailboxServices.size() >= 2);
+ Map.Entry<Integer, GrpcMailboxService> sender =
_mailboxServices.firstEntry();
+ Map.Entry<Integer, GrpcMailboxService> receiver =
_mailboxServices.lastEntry();
+ String mailboxId =
+ String.format("exception:localhost:%d:localhost:%d", sender.getKey(),
receiver.getKey());
+ SendingMailbox<Mailbox.MailboxContent> sendingMailbox =
sender.getValue().getSendingMailbox(mailboxId);
+ ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox =
receiver.getValue().getReceivingMailbox(mailboxId);
+
+ // create mock object
+ Mailbox.MailboxContent testContent = getTooLargeMailboxContent(mailboxId);
+ sendingMailbox.send(testContent);
+
+ // wait for receiving mailbox to be created.
+ TestUtils.waitForCondition(aVoid -> {
+ return receivingMailbox.isInitialized();
+ }, 5000L, "Receiving mailbox initialize failed!");
+
+ Mailbox.MailboxContent receivedContent = receivingMailbox.receive();
+ Assert.assertNotNull(receivedContent);
+ ByteBuffer byteBuffer =
receivedContent.getPayload().asReadOnlyByteBuffer();
+ Assert.assertTrue(byteBuffer.hasRemaining());
+ BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+ Assert.assertTrue(dataBlock instanceof MetadataBlock &&
!dataBlock.getExceptions().isEmpty());
+ }
+
private Mailbox.MailboxContent getTestMailboxContent(String mailboxId)
throws IOException {
return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
@@ -75,4 +106,12 @@ public class GrpcMailboxServiceTest extends
GrpcMailboxServiceTestBase {
).toBytes()))
.build();
}
+
+ private Mailbox.MailboxContent getTooLargeMailboxContent(String mailboxId)
+ throws IOException {
+ return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
+ .putAllMetadata(ImmutableMap.of("key", "value",
ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true"))
+ .setPayload(ByteString.copyFrom(new byte[16_000_000]))
+ .build();
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
index 8a5badd36a..a874ce46fc 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
@@ -18,8 +18,11 @@
*/
package org.apache.pinot.query.mailbox;
+import java.util.Collections;
import java.util.TreeMap;
import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -31,9 +34,11 @@ public abstract class GrpcMailboxServiceTestBase {
@BeforeClass
public void setUp()
throws Exception {
+ PinotConfiguration extraConfig = new
PinotConfiguration(Collections.singletonMap(
+ QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE,
4_000_000));
for (int i = 0; i < MAILBOX_TEST_SIZE; i++) {
int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
- GrpcMailboxService grpcMailboxService = new
GrpcMailboxService("localhost", availablePort);
+ GrpcMailboxService grpcMailboxService = new
GrpcMailboxService("localhost", availablePort, extraConfig);
grpcMailboxService.start();
_mailboxServices.put(availablePort, grpcMailboxService);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index e32ee1db49..22cf24e60f 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -36,6 +36,7 @@ import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -83,7 +84,7 @@ public class QueryRunnerTestBase {
Map<String, Object> reducerConfig = new HashMap<>();
reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME,
_reducerHostname);
- _mailboxService = new GrpcMailboxService(_reducerHostname,
_reducerGrpcPort);
+ _mailboxService = new GrpcMailboxService(_reducerHostname,
_reducerGrpcPort, new PinotConfiguration(reducerConfig));
_mailboxService.start();
_queryEnvironment =
QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort,
server1.getPort(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]