This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new bc0a4b8 Forget to recycle MessageMetadata protobuf object (#1541) bc0a4b8 is described below commit bc0a4b87ff5c2a2015e9396c45f7c70dcdb632a5 Author: Matteo Merli <mme...@apache.org> AuthorDate: Sun Apr 15 20:40:14 2018 -0700 Forget to recycle MessageMetadata protobuf object (#1541) * Forget to recycle MessageMetadata protobuf object * Fixed merge --- .../apache/pulsar/client/impl/ProducerImpl.java | 31 ++++++++++++---------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 92a512d..d866b28 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -260,7 +260,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } MessageImpl<T> msg = (MessageImpl<T>) message; - MessageMetadata.Builder msgMetadata = msg.getMessageBuilder(); + MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); ByteBuf payload = msg.getDataBuffer(); // If compression is enabled, we are compressing, otherwise it will simply use the same buffer @@ -286,35 +286,35 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return; } - if (!msg.isReplicated() && msgMetadata.hasProducerName()) { + if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) { callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message")); compressedPayload.release(); return; } if (schemaVersion.isPresent()) { - msgMetadata.setSchemaVersion(ByteString.copyFrom(schemaVersion.get())); + msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.get())); } try { synchronized (this) { long sequenceId; - if (!msgMetadata.hasSequenceId()) { + if (!msgMetadataBuilder.hasSequenceId()) { sequenceId = msgIdGeneratorUpdater.getAndIncrement(this); - msgMetadata.setSequenceId(sequenceId); + msgMetadataBuilder.setSequenceId(sequenceId); } else { - sequenceId = msgMetadata.getSequenceId(); + sequenceId = msgMetadataBuilder.getSequenceId(); } - if (!msgMetadata.hasPublishTime()) { - msgMetadata.setPublishTime(System.currentTimeMillis()); + if (!msgMetadataBuilder.hasPublishTime()) { + msgMetadataBuilder.setPublishTime(System.currentTimeMillis()); - checkArgument(!msgMetadata.hasProducerName()); + checkArgument(!msgMetadataBuilder.hasProducerName()); - msgMetadata.setProducerName(producerName); + msgMetadataBuilder.setProducerName(producerName); if (conf.getCompressionType() != CompressionType.NONE) { - msgMetadata.setCompression(convertCompressionType(conf.getCompressionType())); - msgMetadata.setUncompressedSize(uncompressedSize); + msgMetadataBuilder.setCompression(convertCompressionType(conf.getCompressionType())); + msgMetadataBuilder.setUncompressedSize(uncompressedSize); } } @@ -332,8 +332,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne doBatchSendAndAdd(msg, callback, payload); } } else { - ByteBuf encryptedPayload = encryptMessage(msgMetadata, compressedPayload); - ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata.build(), encryptedPayload); + ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload); + + MessageMetadata msgMetadata = msgMetadataBuilder.build(); + ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata, encryptedPayload); + msgMetadataBuilder.recycle(); msgMetadata.recycle(); final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback); -- To stop receiving notification emails like this one, please contact mme...@apache.org.