This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0be79c9b37 [multistage] adding error block bubble up for GRPC 
exceptions (#9143)
0be79c9b37 is described below

commit 0be79c9b370dc0af05eae0ab9b21a5867b06efba
Author: Rong Rong <[email protected]>
AuthorDate: Tue Aug 2 20:41:15 2022 -0700

    [multistage] adding error block bubble up for GRPC exceptions (#9143)
    
    * adding error block bubble up for GRPC exceptions
    
    * adding configuration for max inbound message size so that test properly 
throw exception now
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../MultiStageBrokerRequestHandler.java            |  2 +-
 .../pinot/query/mailbox/GrpcMailboxService.java    |  5 ++-
 .../pinot/query/mailbox/GrpcSendingMailbox.java    |  6 +++
 .../query/mailbox/channel/ChannelManager.java      |  5 ++-
 .../pinot/query/mailbox/channel/ChannelUtils.java  |  1 +
 .../query/mailbox/channel/GrpcMailboxServer.java   | 10 +++--
 .../channel/MailboxContentStreamObserver.java      | 48 ++++++++++++++--------
 .../apache/pinot/query/runtime/QueryRunner.java    |  2 +-
 .../runtime/operator/MailboxReceiveOperator.java   |  7 ++--
 .../apache/pinot/query/service/QueryConfig.java    |  4 +-
 .../query/mailbox/GrpcMailboxServiceTest.java      | 39 ++++++++++++++++++
 .../query/mailbox/GrpcMailboxServiceTestBase.java  |  7 +++-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  3 +-
 13 files changed, 107 insertions(+), 32 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d7b7a1c802..43adfd6045 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -90,7 +90,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
         new WorkerManager(_reducerHostname, _reducerPort, routingManager));
     _queryDispatcher = new QueryDispatcher();
-    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort);
+    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort, 
config);
 
     // TODO: move this to a startUp() function.
     _mailboxService.start();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index 30f0518a19..c4d81fd76a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -22,6 +22,7 @@ import io.grpc.ManagedChannel;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 /**
@@ -53,10 +54,10 @@ public class GrpcMailboxService implements 
MailboxService<MailboxContent> {
   private final ConcurrentHashMap<String, SendingMailbox<MailboxContent>> 
_sendingMailboxMap =
       new ConcurrentHashMap<>();
 
-  public GrpcMailboxService(String hostname, int mailboxPort) {
+  public GrpcMailboxService(String hostname, int mailboxPort, 
PinotConfiguration extraConfig) {
     _hostname = hostname;
     _mailboxPort = mailboxPort;
-    _channelManager = new ChannelManager(this);
+    _channelManager = new ChannelManager(this, extraConfig);
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 2aff8a03e3..506d66e01c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
+import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 
 /**
@@ -48,6 +49,11 @@ public class GrpcSendingMailbox implements 
SendingMailbox<MailboxContent> {
     PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
     _statusStreamObserver = new MailboxStatusStreamObserver();
     _statusStreamObserver.init(stub.open(_statusStreamObserver));
+    // send a begin-of-stream message.
+    _statusStreamObserver.send(MailboxContent.newBuilder()
+        .setMailboxId(_mailboxId)
+        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true")
+        .build());
     _initialized.set(true);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index 72d90c07ac..006442cf1c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -23,6 +23,7 @@ import io.grpc.ManagedChannelBuilder;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 /**
@@ -40,9 +41,9 @@ public class ChannelManager {
 
   private final ConcurrentHashMap<String, ManagedChannel> _channelMap = new 
ConcurrentHashMap<>();
 
-  public ChannelManager(GrpcMailboxService mailboxService) {
+  public ChannelManager(GrpcMailboxService mailboxService, PinotConfiguration 
extraConfig) {
     _mailboxService = mailboxService;
-    _grpcMailboxServer = new GrpcMailboxServer(_mailboxService, 
_mailboxService.getMailboxPort());
+    _grpcMailboxServer = new GrpcMailboxServer(_mailboxService, 
_mailboxService.getMailboxPort(), extraConfig);
   }
 
   public void init() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
index 1662047263..c9ecade883 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.mailbox.channel;
 public class ChannelUtils {
 
   public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size";
+  public static final String MAILBOX_METADATA_BEGIN_OF_STREAM_KEY = 
"begin.of.stream";
   public static final String MAILBOX_METADATA_END_OF_STREAM_KEY = 
"end.of.stream";
 
   private ChannelUtils() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 2a6706b709..0b3ec4a8ea 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,10 +45,13 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
   private final GrpcMailboxService _mailboxService;
   private final Server _server;
 
-  public GrpcMailboxServer(GrpcMailboxService mailboxService, int port) {
+  public GrpcMailboxServer(GrpcMailboxService mailboxService, int port, 
PinotConfiguration extraConfig) {
     _mailboxService = mailboxService;
-    _server = ServerBuilder.forPort(port).addService(this)
-        
.maxInboundMessageSize(QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE).build();
+    _server = ServerBuilder.forPort(port)
+        .addService(this)
+        
.maxInboundMessageSize(extraConfig.getProperty(QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE,
+            QueryConfig.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE))
+        .build();
     LOGGER.info("Initialized GrpcMailboxServer on port: {}", port);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 5b79549a27..72deae4976 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pinot.query.mailbox.channel;
 
+import com.google.protobuf.ByteString;
 import io.grpc.stub.StreamObserver;
+import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +48,7 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
   private final boolean _isEnabledFeedback;
 
   private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
+  private String _mailboxId;
   private ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
 
   public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
@@ -80,30 +84,40 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
 
   @Override
   public void onNext(Mailbox.MailboxContent mailboxContent) {
-    GrpcReceivingMailbox receivingMailbox =
-        (GrpcReceivingMailbox) 
_mailboxService.getReceivingMailbox(mailboxContent.getMailboxId());
+    _mailboxId = mailboxContent.getMailboxId();
+    GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox) 
_mailboxService.getReceivingMailbox(_mailboxId);
     receivingMailbox.init(this);
-    // when the receiving end receives a message put it in the mailbox queue.
-    _receivingBuffer.offer(mailboxContent);
-    if (_isEnabledFeedback) {
-      // TODO: this has race conditions with onCompleted() because sender 
blindly closes connection channels once
-      // it has finished sending all the data packets.
-      int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
-      Mailbox.MailboxStatus.Builder builder =
-          
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
-              .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, 
String.valueOf(remainingCapacity));
-      if 
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
 != null) {
-        builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true");
+    if 
(!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY))
 {
+      // when the receiving end receives a message put it in the mailbox queue.
+      _receivingBuffer.offer(mailboxContent);
+      if (_isEnabledFeedback) {
+        // TODO: this has race conditions with onCompleted() because sender 
blindly closes connection channels once
+        // it has finished sending all the data packets.
+        int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
+        Mailbox.MailboxStatus.Builder builder =
+            
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
+                .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, 
String.valueOf(remainingCapacity));
+        if 
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
 != null) {
+          builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true");
+        }
+        Mailbox.MailboxStatus status = builder.build();
+        // returns the buffer available size to sender for rate controller / 
throttling.
+        _responseObserver.onNext(status);
       }
-      Mailbox.MailboxStatus status = builder.build();
-      // returns the buffer available size to sender for rate controller / 
throttling.
-      _responseObserver.onNext(status);
     }
   }
 
   @Override
   public void onError(Throwable e) {
-    throw new RuntimeException(e);
+    try {
+      _receivingBuffer.offer(Mailbox.MailboxContent.newBuilder()
+          .setPayload(ByteString.copyFrom(
+              TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException(e)).toBytes()))
+          .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true").build());
+      throw new RuntimeException(e);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Unable to encode exception for cascade 
reporting: " + e, ioe);
+    }
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 2f7f23f264..bf53684d32 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -71,7 +71,7 @@ public class QueryRunner {
         CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
     _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
     try {
-      _mailboxService = new GrpcMailboxService(_hostname, _port);
+      _mailboxService = new GrpcMailboxService(_hostname, _port, config);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config, instanceDataManager, serverMetrics);
       _workerExecutor = new WorkerQueryExecutor();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 92309f946b..91d5de3d8d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
@@ -127,10 +128,10 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
     }
     if (System.nanoTime() >= timeoutWatermark) {
       LOGGER.error("Timed out after polling mailboxes: {}", 
_sendingStageInstances);
+      return 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+    } else {
+      return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
     }
-    // TODO: we need to at least return one data table with schema if there's 
no error.
-    // we need to condition this on whether there's already things being 
returned or not.
-    return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
   }
 
   public RelDistribution.Type getExchangeType() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index f2a3d12d14..2b90c93f4c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -22,9 +22,11 @@ package org.apache.pinot.query.service;
  * Configuration for setting up query runtime.
  */
 public class QueryConfig {
-  public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE = 
128 * 1024 * 1024;
   public static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
 
+  public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE = 
"pinot.query.runner.max.msg.size";
+  public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE = 
128 * 1024 * 1024;
+
   public static final String KEY_OF_QUERY_SERVER_PORT = 
"pinot.query.server.port";
   public static final int DEFAULT_QUERY_SERVER_PORT = 0;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 5661d9cae7..9ba1b52ddc 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -22,10 +22,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.util.TestUtils;
@@ -65,6 +68,34 @@ public class GrpcMailboxServiceTest extends 
GrpcMailboxServiceTestBase {
     }, 5000L, "Receiving mailbox is not closed properly!");
   }
 
+  @Test
+  public void testGrpcException()
+      throws Exception {
+    Preconditions.checkState(_mailboxServices.size() >= 2);
+    Map.Entry<Integer, GrpcMailboxService> sender = 
_mailboxServices.firstEntry();
+    Map.Entry<Integer, GrpcMailboxService> receiver = 
_mailboxServices.lastEntry();
+    String mailboxId =
+        String.format("exception:localhost:%d:localhost:%d", sender.getKey(), 
receiver.getKey());
+    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = 
sender.getValue().getSendingMailbox(mailboxId);
+    ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox = 
receiver.getValue().getReceivingMailbox(mailboxId);
+
+    // create mock object
+    Mailbox.MailboxContent testContent = getTooLargeMailboxContent(mailboxId);
+    sendingMailbox.send(testContent);
+
+    // wait for receiving mailbox to be created.
+    TestUtils.waitForCondition(aVoid -> {
+      return receivingMailbox.isInitialized();
+    }, 5000L, "Receiving mailbox initialize failed!");
+
+    Mailbox.MailboxContent receivedContent = receivingMailbox.receive();
+    Assert.assertNotNull(receivedContent);
+    ByteBuffer byteBuffer = 
receivedContent.getPayload().asReadOnlyByteBuffer();
+    Assert.assertTrue(byteBuffer.hasRemaining());
+    BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+    Assert.assertTrue(dataBlock instanceof MetadataBlock && 
!dataBlock.getExceptions().isEmpty());
+  }
+
   private Mailbox.MailboxContent getTestMailboxContent(String mailboxId)
       throws IOException {
     return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
@@ -75,4 +106,12 @@ public class GrpcMailboxServiceTest extends 
GrpcMailboxServiceTestBase {
         ).toBytes()))
         .build();
   }
+
+  private Mailbox.MailboxContent getTooLargeMailboxContent(String mailboxId)
+      throws IOException {
+    return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
+        .putAllMetadata(ImmutableMap.of("key", "value", 
ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true"))
+        .setPayload(ByteString.copyFrom(new byte[16_000_000]))
+        .build();
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
index 8a5badd36a..a874ce46fc 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTestBase.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.Collections;
 import java.util.TreeMap;
 import org.apache.pinot.query.QueryEnvironmentTestUtils;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
@@ -31,9 +34,11 @@ public abstract class GrpcMailboxServiceTestBase {
   @BeforeClass
   public void setUp()
       throws Exception {
+    PinotConfiguration extraConfig = new 
PinotConfiguration(Collections.singletonMap(
+        QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_BYTES_SIZE, 
4_000_000));
     for (int i = 0; i < MAILBOX_TEST_SIZE; i++) {
       int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
-      GrpcMailboxService grpcMailboxService = new 
GrpcMailboxService("localhost", availablePort);
+      GrpcMailboxService grpcMailboxService = new 
GrpcMailboxService("localhost", availablePort, extraConfig);
       grpcMailboxService.start();
       _mailboxServices.put(availablePort, grpcMailboxService);
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index e32ee1db49..22cf24e60f 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -36,6 +36,7 @@ import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
@@ -83,7 +84,7 @@ public class QueryRunnerTestBase {
     Map<String, Object> reducerConfig = new HashMap<>();
     reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
     reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, 
_reducerHostname);
-    _mailboxService = new GrpcMailboxService(_reducerHostname, 
_reducerGrpcPort);
+    _mailboxService = new GrpcMailboxService(_reducerHostname, 
_reducerGrpcPort, new PinotConfiguration(reducerConfig));
     _mailboxService.start();
 
     _queryEnvironment = 
QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, 
server1.getPort(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to