rdhabalia commented on a change in pull request #5443: [pulsar-client] Fix 
message corruption on OOM for batch messages
URL: https://github.com/apache/pulsar/pull/5443#discussion_r337786504
 
 

 ##########
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 ##########
 @@ -82,11 +82,32 @@ public void add(MessageImpl<?> msg, SendCallback callback) 
{
     }
 
     private ByteBuf getCompressedBatchMetadataAndPayload() {
-        for (MessageImpl<?> msg : messages) {
+        batchedMessageMetadataAndPayload.markWriterIndex();
+        batchedMessageMetadataAndPayload.markReaderIndex();
+        int lastSerializedMessageIndex = 0;
+        
+        for(MessageImpl<?> msg : messages) {
             PulsarApi.MessageMetadata.Builder msgBuilder = 
msg.getMessageBuilder();
-            batchedMessageMetadataAndPayload = 
Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
-                    msg.getDataBuffer(), batchedMessageMetadataAndPayload);
-            msgBuilder.recycle();
+            msg.getDataBuffer().markReaderIndex();
+            try {
+                batchedMessageMetadataAndPayload = 
Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
+                        msg.getDataBuffer(), batchedMessageMetadataAndPayload);
+            } catch (Throwable th) {
+                // serializing batch message can corrupt the index of message 
and batch-message. Reset the index so,
+                // next iteration doesn't send corrupt message to broker.
+                for (int j = 0; j <= lastSerializedMessageIndex; j++) {
+                    MessageImpl<?> previousMsg = messages.get(j);
+                    previousMsg.getDataBuffer().resetReaderIndex();
+                }
+                batchedMessageMetadataAndPayload.resetWriterIndex();
+                batchedMessageMetadataAndPayload.resetReaderIndex();
+                throw new RuntimeException(th);
+            }
+            lastSerializedMessageIndex++;
+        }
+        // Recycle messages only once they serialized successfully in batch 
+        for (MessageImpl<?> msg : messages) {
+            msg.getMessageBuilder().recycle();
 
 Review comment:
   yes, this one is necessary to recycle all of them together once serializing 
of all messages completed else it throws NPE if batch-scheduler tries to use 
already recycled msgBuilder.
   ```
   java.lang.NullPointerException
        at io.netty.util.Recycler.recycle(Recycler.java:179)
        at 
org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata$Builder.recycle(PulsarApi.java:3841)
        at 
org.apache.pulsar.client.impl.BatchMessageContainerImpl.getCompressedBatchMetadataAndPayload(BatchMessageContainerImpl.java:89)
        at 
org.apache.pulsar.client.impl.BatchMessageContainerImpl.createOpSendMsg(BatchMessageContainerImpl.java:144)
        at 
org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1296)
        at 
org.apache.pulsar.client.impl.ProducerImpl.access$4(ProducerImpl.java:1283)
        at 
org.apache.pulsar.client.impl.ProducerImpl$1.run(ProducerImpl.java:1253)
        at 
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:682)
        at 
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:757)
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:485)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)
   ```
   eg:
   1. if batch has 4 messages and serialization of 2 msgs successful and have 
been recycled
   2. now 3rd msg fails with exception
   3. batch-scheduler again try to serialize all 4 messages and at that time 
serialization throws NPE for first 2 msgs because builder is already recycled.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to