This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc0a4b8  Forget to recycle MessageMetadata protobuf object (#1541)
bc0a4b8 is described below

commit bc0a4b87ff5c2a2015e9396c45f7c70dcdb632a5
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sun Apr 15 20:40:14 2018 -0700

    Forget to recycle MessageMetadata protobuf object (#1541)
    
    * Forget to recycle MessageMetadata protobuf object
    
    * Fixed merge
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 31 ++++++++++++----------
 1 file changed, 17 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 92a512d..d866b28 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -260,7 +260,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
 
         MessageImpl<T> msg = (MessageImpl<T>) message;
-        MessageMetadata.Builder msgMetadata = msg.getMessageBuilder();
+        MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder();
         ByteBuf payload = msg.getDataBuffer();
 
         // If compression is enabled, we are compressing, otherwise it will 
simply use the same buffer
@@ -286,35 +286,35 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return;
         }
 
-        if (!msg.isReplicated() && msgMetadata.hasProducerName()) {
+        if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
             callback.sendComplete(new 
PulsarClientException.InvalidMessageException("Cannot re-use the same 
message"));
             compressedPayload.release();
             return;
         }
 
         if (schemaVersion.isPresent()) {
-            
msgMetadata.setSchemaVersion(ByteString.copyFrom(schemaVersion.get()));
+            
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.get()));
         }
 
         try {
             synchronized (this) {
                 long sequenceId;
-                if (!msgMetadata.hasSequenceId()) {
+                if (!msgMetadataBuilder.hasSequenceId()) {
                     sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
-                    msgMetadata.setSequenceId(sequenceId);
+                    msgMetadataBuilder.setSequenceId(sequenceId);
                 } else {
-                    sequenceId = msgMetadata.getSequenceId();
+                    sequenceId = msgMetadataBuilder.getSequenceId();
                 }
-                if (!msgMetadata.hasPublishTime()) {
-                    msgMetadata.setPublishTime(System.currentTimeMillis());
+                if (!msgMetadataBuilder.hasPublishTime()) {
+                    
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
 
-                    checkArgument(!msgMetadata.hasProducerName());
+                    checkArgument(!msgMetadataBuilder.hasProducerName());
 
-                    msgMetadata.setProducerName(producerName);
+                    msgMetadataBuilder.setProducerName(producerName);
 
                     if (conf.getCompressionType() != CompressionType.NONE) {
-                        
msgMetadata.setCompression(convertCompressionType(conf.getCompressionType()));
-                        msgMetadata.setUncompressedSize(uncompressedSize);
+                        
msgMetadataBuilder.setCompression(convertCompressionType(conf.getCompressionType()));
+                        
msgMetadataBuilder.setUncompressedSize(uncompressedSize);
                     }
                 }
 
@@ -332,8 +332,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                         doBatchSendAndAdd(msg, callback, payload);
                     }
                 } else {
-                    ByteBuf encryptedPayload = encryptMessage(msgMetadata, 
compressedPayload);
-                    ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, 
msgMetadata.build(), encryptedPayload);
+                    ByteBuf encryptedPayload = 
encryptMessage(msgMetadataBuilder, compressedPayload);
+
+                    MessageMetadata msgMetadata = msgMetadataBuilder.build();
+                    ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, 
msgMetadata, encryptedPayload);
+                    msgMetadataBuilder.recycle();
                     msgMetadata.recycle();
 
                     final OpSendMsg op = OpSendMsg.create(msg, cmd, 
sequenceId, callback);

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to