merlimat commented on a change in pull request #5443: [pulsar-client] Fix 
message corruption on OOM for batch messages
URL: https://github.com/apache/pulsar/pull/5443#discussion_r337780285
 
 

 ##########
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 ##########
 @@ -82,11 +82,32 @@ public void add(MessageImpl<?> msg, SendCallback callback) 
{
     }
 
     private ByteBuf getCompressedBatchMetadataAndPayload() {
-        for (MessageImpl<?> msg : messages) {
+        batchedMessageMetadataAndPayload.markWriterIndex();
+        batchedMessageMetadataAndPayload.markReaderIndex();
+        int lastSerializedMessageIndex = 0;
+        
+        for(MessageImpl<?> msg : messages) {
             PulsarApi.MessageMetadata.Builder msgBuilder = 
msg.getMessageBuilder();
-            batchedMessageMetadataAndPayload = 
Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
-                    msg.getDataBuffer(), batchedMessageMetadataAndPayload);
-            msgBuilder.recycle();
+            msg.getDataBuffer().markReaderIndex();
+            try {
+                batchedMessageMetadataAndPayload = 
Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
+                        msg.getDataBuffer(), batchedMessageMetadataAndPayload);
+            } catch (Throwable th) {
+                // serializing batch message can corrupt the index of message 
and batch-message. Reset the index so,
+                // next iteration doesn't send corrupt message to broker.
+                for (int j = 0; j <= lastSerializedMessageIndex; j++) {
+                    MessageImpl<?> previousMsg = messages.get(j);
+                    previousMsg.getDataBuffer().resetReaderIndex();
+                }
+                batchedMessageMetadataAndPayload.resetWriterIndex();
+                batchedMessageMetadataAndPayload.resetReaderIndex();
+                throw new RuntimeException(th);
+            }
+            lastSerializedMessageIndex++;
+        }
+        // Recycle messages only once they serialized successfully in batch 
+        for (MessageImpl<?> msg : messages) {
+            msg.getMessageBuilder().recycle();
 
 Review comment:
   Is it necessary to loop again? What effect has recycling the messages that 
were successfully added? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to