walterddr commented on code in PR #9422:
URL: https://github.com/apache/pinot/pull/9422#discussion_r981451980


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -192,14 +199,17 @@ private static List<BaseDataBlock> 
constructPartitionedDataBlock(BaseDataBlock d
     }
   }
 
-  private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock 
dataBlock)
+  private void sendDataTableBlock(ServerInstance serverInstance, 
TransferableBlock block)
       throws IOException {
+    List<TransferableBlock> chunks = TransferableBlockUtils.splitBlock(block, 
_maxBlockSize);
     String mailboxId = toMailboxId(serverInstance);
-    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = 
_mailboxService.getSendingMailbox(mailboxId);
-    Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, 
dataBlock);
-    sendingMailbox.send(mailboxContent);
-    if 
(mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY))
 {
-      sendingMailbox.complete();
+    for (TransferableBlock chunk : chunks) {
+      SendingMailbox<Mailbox.MailboxContent> sendingMailbox = 
_mailboxService.getSendingMailbox(mailboxId);
+      Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, 
chunk.getDataBlock());
+      sendingMailbox.send(mailboxContent);
+      if 
(mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY))
 {
+        sendingMailbox.complete();
+      }

Review Comment:
   this should be done outside of the loop --> maybe we can just check against 
transferrable block contains a metadata block



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -156,7 +157,7 @@ public boolean isErrorBlock() {
 
   public byte[] toBytes()
       throws IOException {
-    return _dataBlock.toBytes();
+    return getDataBlock().toBytes();
   }

Review Comment:
   Add a new API call and rename the current one:
   ```
   public byte[] getDataBlock() {
     // ...
   }
   public List<byte[]> getDataBlockTrunk() {
     // ...
   }
   ```
   



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -43,4 +45,45 @@ public static TransferableBlock 
getErrorTransferableBlock(Map<Integer, String> e
   public static boolean isEndOfStream(TransferableBlock transferableBlock) {
     return transferableBlock.isEndOfStreamBlock();
   }
+
+  /**
+   *  Split a block into multiple block so that each block size is within 
maxBlockSize.
+   *  Currently, we only support split for row type dataBlock.
+   *  When row size is greater than maxBlockSize, we pack each row as a 
separate block.
+   */
+  public static List<TransferableBlock> splitBlock(TransferableBlock block, 
int maxBlockSize) {

Review Comment:
   dont make this public and encapsulate this inside transferable block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to