61yao commented on code in PR #9422:
URL: https://github.com/apache/pinot/pull/9422#discussion_r981641204
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -156,7 +157,7 @@ public boolean isErrorBlock() {
public byte[] toBytes()
throws IOException {
- return _dataBlock.toBytes();
+ return getDataBlock().toBytes();
}
Review Comment:
Done
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java:
##########
@@ -43,4 +45,45 @@ public static TransferableBlock
getErrorTransferableBlock(Map<Integer, String> e
public static boolean isEndOfStream(TransferableBlock transferableBlock) {
return transferableBlock.isEndOfStreamBlock();
}
+
+ /**
+ * Split a block into multiple block so that each block size is within
maxBlockSize.
+ * Currently, we only support split for row type dataBlock.
+ * When row size is greater than maxBlockSize, we pack each row as a
separate block.
+ */
+ public static List<TransferableBlock> splitBlock(TransferableBlock block,
int maxBlockSize) {
Review Comment:
Done
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -192,14 +199,17 @@ private static List<BaseDataBlock>
constructPartitionedDataBlock(BaseDataBlock d
}
}
- private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock
dataBlock)
+ private void sendDataTableBlock(ServerInstance serverInstance,
TransferableBlock block)
throws IOException {
+ List<TransferableBlock> chunks = TransferableBlockUtils.splitBlock(block,
_maxBlockSize);
String mailboxId = toMailboxId(serverInstance);
- SendingMailbox<Mailbox.MailboxContent> sendingMailbox =
_mailboxService.getSendingMailbox(mailboxId);
- Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId,
dataBlock);
- sendingMailbox.send(mailboxContent);
- if
(mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY))
{
- sendingMailbox.complete();
+ for (TransferableBlock chunk : chunks) {
+ SendingMailbox<Mailbox.MailboxContent> sendingMailbox =
_mailboxService.getSendingMailbox(mailboxId);
+ Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId,
chunk.getDataBlock());
+ sendingMailbox.send(mailboxContent);
+ if
(mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY))
{
+ sendingMailbox.complete();
+ }
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]