MarvinCai commented on a change in pull request #12720:
URL: https://github.com/apache/pulsar/pull/12720#discussion_r749802832



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -60,6 +60,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import lombok.Data;

Review comment:
       nit: seems unnecessary import.

##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void 
initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) 
message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about 
the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = 
Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> 
ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || 
chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happens when the 
beginning chunk didn't belong to this split.
+            log.info("Received unexpected chunk. messageId: %s, last-chunk-id: 
%s chunkId: %s, totalChunks: %s",
+                    message.getMessageId(),
+                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+                    numChunks);
+            if (chunkedMsgCtx != null) {
+                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                    
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                }
+                chunkedMsgCtx.recycle();
+            }
+            chunkedMessagesMap.remove(uuid);
+            message.release();
+            return null;
+        }
+
+        // append the chunked payload and update lastChunkedMessage-id
+        chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+        chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+        // if final chunk is not received yet then release payload and return
+        if (chunkId != (numChunks - 1)) {
+            message.release();
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Chunked message completed. chunkId: %s, totalChunks: 
%s, msgId: %s, sequenceId: %s",
+                    chunkId, numChunks, rawMessageId, message.getSequenceId());
+        }
+        chunkedMessagesMap.remove(uuid);
+        ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+        chunkedMsgCtx.recycle();
+        return ((RawMessageImpl) 
message).updatePayloadForChunkedMessage(unCompressedPayload);
+    }
+

Review comment:
       if I understand correctly, this is to buffer the chunk message payload 
till we receive the whole message, it'll be good if we can add brief doc for it.

##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -706,4 +730,94 @@ private void 
initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig)
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) 
message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about 
the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = 
Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> 
ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || 
chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happens when the 
beginning chunk didn't belong to this split.

Review comment:
       ```suggestion
               // Means we lost the first chunk, it will happen when the 
beginning chunk didn't belong to this split.
   ```

##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -278,15 +287,25 @@ public void accept(Entry entry) {
                                                 // start time for message 
queue read
                                                 
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
 
-                                                while (true) {
-                                                    if 
(!haveAvailableCacheSize(
-                                                            
messageQueueCacheSizeAllocator, messageQueue)
-                                                            || 
!messageQueue.offer(message)) {
-                                                        Thread.sleep(1);
-                                                    } else {
-                                                        
messageQueueCacheSizeAllocator.allocate(
-                                                                
message.getData().readableBytes());
-                                                        break;
+                                                if 
(message.getNumChunksFromMsg() > 1)  {
+                                                    message = 
processChunkedMessages(message);
+                                                } else if 
(entryExceedSplitEndPosition(entry)) {

Review comment:
       will this ever be true? as we've checked at line 276




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to