MarvinCai commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749802832
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -60,6 +60,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import lombok.Data;
Review comment:
nit: seems unnecessary import.
##########
File path:
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void
initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
}
}
+ private RawMessage processChunkedMessages(RawMessage message) {
+ final String uuid = message.getUUID();
+ final int chunkId = message.getChunkId();
+ final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+ final int numChunks = message.getNumChunksFromMsg();
+
+ RawMessageIdImpl rawMessageId = (RawMessageIdImpl)
message.getMessageId();
+ if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+ && !chunkedMessagesMap.containsKey(uuid)) {
+ // If the message is out of the split range, we only care about
the incomplete chunked messages.
+ message.release();
+ return null;
+ }
+ if (chunkId == 0) {
+ ByteBuf chunkedMsgBuffer =
Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+ chunkedMessagesMap.computeIfAbsent(uuid, (key) ->
ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+ }
+
+ ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+ if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+ || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) ||
chunkId >= numChunks) {
+ // Means we lost the first chunk, it will happens when the
beginning chunk didn't belong to this split.
+ log.info("Received unexpected chunk. messageId: %s, last-chunk-id:
%s chunkId: %s, totalChunks: %s",
+ message.getMessageId(),
+ (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+ numChunks);
+ if (chunkedMsgCtx != null) {
+ if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ }
+ chunkedMsgCtx.recycle();
+ }
+ chunkedMessagesMap.remove(uuid);
+ message.release();
+ return null;
+ }
+
+ // append the chunked payload and update lastChunkedMessage-id
+ chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+ chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+ // if final chunk is not received yet then release payload and return
+ if (chunkId != (numChunks - 1)) {
+ message.release();
+ return null;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Chunked message completed. chunkId: %s, totalChunks:
%s, msgId: %s, sequenceId: %s",
+ chunkId, numChunks, rawMessageId, message.getSequenceId());
+ }
+ chunkedMessagesMap.remove(uuid);
+ ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+ chunkedMsgCtx.recycle();
+ return ((RawMessageImpl)
message).updatePayloadForChunkedMessage(unCompressedPayload);
+ }
+
Review comment:
if I understand correctly, this is to buffer the chunk message payload
till we receive the whole message, it'll be good if we can add brief doc for it.
##########
File path:
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void
initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
}
}
+ private RawMessage processChunkedMessages(RawMessage message) {
+ final String uuid = message.getUUID();
+ final int chunkId = message.getChunkId();
+ final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+ final int numChunks = message.getNumChunksFromMsg();
+
+ RawMessageIdImpl rawMessageId = (RawMessageIdImpl)
message.getMessageId();
+ if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+ && !chunkedMessagesMap.containsKey(uuid)) {
+ // If the message is out of the split range, we only care about
the incomplete chunked messages.
+ message.release();
+ return null;
+ }
+ if (chunkId == 0) {
+ ByteBuf chunkedMsgBuffer =
Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+ chunkedMessagesMap.computeIfAbsent(uuid, (key) ->
ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+ }
+
+ ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+ if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+ || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) ||
chunkId >= numChunks) {
+ // Means we lost the first chunk, it will happens when the
beginning chunk didn't belong to this split.
Review comment:
```suggestion
// Means we lost the first chunk, it will happen when the
beginning chunk didn't belong to this split.
```
##########
File path:
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -278,15 +287,25 @@ public void accept(Entry entry) {
// start time for message
queue read
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
- while (true) {
- if
(!haveAvailableCacheSize(
-
messageQueueCacheSizeAllocator, messageQueue)
- ||
!messageQueue.offer(message)) {
- Thread.sleep(1);
- } else {
-
messageQueueCacheSizeAllocator.allocate(
-
message.getData().readableBytes());
- break;
+ if
(message.getNumChunksFromMsg() > 1) {
+ message =
processChunkedMessages(message);
+ } else if
(entryExceedSplitEndPosition(entry)) {
Review comment:
will this ever be true? as we've checked at line 276
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]