This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 3e1eb0d136f [fix][client] Fix client side memory leak when call
MessageImpl.create and fix imprecise client-side metrics:
pendingMessagesUpDownCounter, pendingBytesUpDownCounter, latencyHistogram
(#22393)
3e1eb0d136f is described below
commit 3e1eb0d136f4500b7cfa1f74ea3836c7116ef82e
Author: fengyubiao <[email protected]>
AuthorDate: Sun Apr 7 09:28:04 2024 +0800
[fix][client] Fix client side memory leak when call MessageImpl.create and
fix imprecise client-side metrics: pendingMessagesUpDownCounter,
pendingBytesUpDownCounter, latencyHistogram (#22393)
(cherry picked from commit 2469b97b7e4de10fec64cc7ff1f4f46a410ad125)
---
.../client/api/SimpleProducerConsumerTest.java | 144 +++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 127 +++++++++---------
2 files changed, 211 insertions(+), 60 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4c106d39e7a..7552b84a1c5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -51,6 +51,7 @@ import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -69,6 +70,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -92,6 +94,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -99,11 +102,13 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -4692,4 +4697,143 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
consumer.close();
admin.topics().delete(topic, false);
}
+
+ @DataProvider(name = "enableBatchSend")
+ public Object[][] enableBatchSend() {
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "enableBatchSend")
+ public void testPublishWithCreateMessageManually(boolean enableBatchSend)
throws Exception {
+ final int messageCount = 10;
+ final List<MessageImpl> messageArrayBeforeSend =
Collections.synchronizedList(new ArrayList<>());
+ final List<MessageImpl> messageArrayOnSendAcknowledgement =
Collections.synchronizedList(new ArrayList<>());
+ // Create an interceptor to verify the ref count of Message.payload is
as expected.
+ AtomicBoolean payloadWasReleasedWhenIntercept = new
AtomicBoolean(false);
+ ProducerInterceptor interceptor = new ProducerInterceptor(){
+
+ @Override
+ public void close() {
+
+ }
+ @Override
+ public Message beforeSend(Producer producer, Message message) {
+ MessageImpl msgImpl = (MessageImpl) message;
+ log.info("payload.refCnf before send: {}",
msgImpl.getDataBuffer().refCnt());
+ if (msgImpl.getDataBuffer().refCnt() < 1) {
+ payloadWasReleasedWhenIntercept.set(true);
+ }
+ messageArrayBeforeSend.add(msgImpl);
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer producer, Message
message, MessageId msgId,
+ Throwable exception) {
+ MessageImpl msgImpl = (MessageImpl) message;
+ log.info("payload.refCnf on send acknowledgement: {}",
msgImpl.getDataBuffer().refCnt());
+ if (msgImpl.getDataBuffer().refCnt() < 1) {
+ payloadWasReleasedWhenIntercept.set(true);
+ }
+ messageArrayOnSendAcknowledgement.add(msgImpl);
+ }
+ };
+
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ ProducerBase producerBase = (ProducerBase)
pulsarClient.newProducer().topic(topic).intercept(interceptor)
+ .enableBatching(enableBatchSend).create();
+
+ // Publish message.
+ // Note: "ProducerBase.sendAsync" is not equals to
"Producer.sendAsync".
+ final MessageImpl[] messageArraySent = new MessageImpl[messageCount];
+ final ByteBuf[] payloads = new ByteBuf[messageCount];
+ List<CompletableFuture<MessageId>> sendFutureList = new ArrayList<>();
+ List<CompletableFuture> releaseFutureList = new ArrayList<>();
+ for (int i = 0; i < messageCount; i++) {
+ // Create message payload, refCnf = 1 now.
+ ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
+ payloads[i] = payload;
+ log.info("payload_{}.refCnf 1st: {}", i, payload.refCnt());
+ payload.writeByte(i);
+ // refCnf = 2 now.
+ payload.retain();
+ log.info("payload_{}.refCnf 2nd: {}", i, payload.refCnt());
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setUncompressedSize(1);
+ MessageImpl<byte[]> message1 = MessageImpl.create(topic, null,
messageMetadata, payload, Optional.empty(),
+ null, Schema.BYTES, 0, true, 0);
+ messageArraySent[i] = message1;
+ // Release ByteBuf the first time, refCnf = 1 now.
+ CompletableFuture<MessageId> future =
producerBase.sendAsync(message1);
+ sendFutureList.add(future);
+ final int indexForLog = i;
+ future.whenComplete((v, ex) -> {
+ message1.release();
+ log.info("payload_{}.refCnf 3rd after_complete_refCnf: {}, ex:
{}", indexForLog, payload.refCnt(),
+ ex == null ? "null" : ex.getMessage());
+ });
+ }
+ sendFutureList.get(messageCount - 1).join();
+
+ // Left 2 seconds to wait the code in the finally-block, which is
using to avoid this test to be flaky.
+ Thread.sleep(1000 * 2);
+
+ // Verify: payload's refCnf.
+ for (int i = 0; i < messageCount; i++) {
+ log.info("payload_{}.refCnf 4th: {}", i, payloads[i].refCnt());
+ assertEquals(payloads[i].refCnt(), 1);
+ }
+
+ // Verify: the messages has not been released when calling interceptor.
+ assertFalse(payloadWasReleasedWhenIntercept.get());
+
+ // Verify: the order of send complete event.
+ MessageIdImpl messageIdPreviousOne = null;
+ for (int i = 0; i < messageCount; i++) {
+ MessageIdImpl messageId = (MessageIdImpl)
sendFutureList.get(i).get();
+ if (messageIdPreviousOne != null) {
+ assertTrue(compareMessageIds(messageIdPreviousOne, messageId)
> 0);
+ }
+ messageIdPreviousOne = messageId;
+ }
+
+ // Verify: the order of interceptor events.
+ for (int i = 0; i < messageCount; i++) {
+ assertTrue(messageArraySent[i] == messageArrayBeforeSend.get(i));
+ assertTrue(messageArraySent[i] ==
messageArrayOnSendAcknowledgement.get(i));
+ }
+
+ // cleanup.
+ for (int i = 0; i < messageCount; i++) {
+ payloads[i].release();
+ }
+ producerBase.close();
+ admin.topics().delete(topic, false);
+ }
+
+ private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl
messageId2) {
+ if (messageId2.getLedgerId() < messageId1.getLedgerId()) {
+ return -1;
+ }
+ if (messageId2.getLedgerId() > messageId1.getLedgerId()) {
+ return 1;
+ }
+ if (messageId2.getEntryId() < messageId1.getEntryId()) {
+ return -1;
+ }
+ if (messageId2.getEntryId() > messageId1.getEntryId()) {
+ return 1;
+ }
+ if (messageId2 instanceof BatchMessageIdImpl && messageId1 instanceof
BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId1 = (BatchMessageIdImpl)
messageId1;
+ BatchMessageIdImpl batchMessageId2 = (BatchMessageIdImpl)
messageId2;
+ return batchMessageId2.getBatchIndex() -
batchMessageId1.getBatchIndex();
+ } else {
+ return 0;
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 880185f7a97..04dfb01eeaa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -337,73 +337,80 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
if (interceptors != null) {
interceptorMessage.getProperties();
}
- sendAsync(interceptorMessage, new SendCallback() {
- SendCallback nextCallback = null;
- MessageImpl<?> nextMsg = null;
- long createdAt = System.nanoTime();
- @Override
- public CompletableFuture<MessageId> getFuture() {
- return future;
- }
+ int msgSize = interceptorMessage.getDataBuffer().readableBytes();
+ sendAsync(interceptorMessage, new DefaultSendMessageCallback(future,
interceptorMessage, msgSize));
+ return future;
+ }
- @Override
- public SendCallback getNextSendCallback() {
- return nextCallback;
- }
+ private class DefaultSendMessageCallback implements SendCallback {
- @Override
- public MessageImpl<?> getNextMessage() {
- return nextMsg;
- }
+ CompletableFuture<MessageId> sendFuture;
+ MessageImpl<?> currentMsg;
+ int msgSize;
+ long createdAt = System.nanoTime();
+ SendCallback nextCallback = null;
+ MessageImpl<?> nextMsg = null;
- @Override
- public void sendComplete(Exception e) {
- try {
- if (e != null) {
- stats.incrementSendFailed();
- onSendAcknowledgement(interceptorMessage, null, e);
- future.completeExceptionally(e);
- } else {
- onSendAcknowledgement(interceptorMessage,
interceptorMessage.getMessageId(), null);
- future.complete(interceptorMessage.getMessageId());
- stats.incrementNumAcksReceived(System.nanoTime() -
createdAt);
- }
- } finally {
- interceptorMessage.getDataBuffer().release();
- }
+ DefaultSendMessageCallback(CompletableFuture<MessageId> sendFuture,
MessageImpl<?> currentMsg, int msgSize) {
+ this.sendFuture = sendFuture;
+ this.currentMsg = currentMsg;
+ this.msgSize = msgSize;
+ }
- while (nextCallback != null) {
- SendCallback sendCallback = nextCallback;
- MessageImpl<?> msg = nextMsg;
- // Retain the buffer used by interceptors callback to get
message. Buffer will release after
- // complete interceptors.
- try {
- msg.getDataBuffer().retain();
- if (e != null) {
- stats.incrementSendFailed();
- onSendAcknowledgement(msg, null, e);
- sendCallback.getFuture().completeExceptionally(e);
- } else {
- onSendAcknowledgement(msg, msg.getMessageId(),
null);
-
sendCallback.getFuture().complete(msg.getMessageId());
- stats.incrementNumAcksReceived(System.nanoTime() -
createdAt);
- }
- nextMsg = nextCallback.getNextMessage();
- nextCallback = nextCallback.getNextSendCallback();
- } finally {
- msg.getDataBuffer().release();
- }
- }
- }
+ @Override
+ public CompletableFuture<MessageId> getFuture() {
+ return sendFuture;
+ }
- @Override
- public void addCallback(MessageImpl<?> msg, SendCallback scb) {
- nextMsg = msg;
- nextCallback = scb;
+ @Override
+ public SendCallback getNextSendCallback() {
+ return nextCallback;
+ }
+
+ @Override
+ public MessageImpl<?> getNextMessage() {
+ return nextMsg;
+ }
+
+ @Override
+ public void sendComplete(Exception e) {
+ SendCallback loopingCallback = this;
+ MessageImpl<?> loopingMsg = currentMsg;
+ while (loopingCallback != null) {
+ onSendComplete(e, loopingCallback, loopingMsg);
+ loopingMsg = loopingCallback.getNextMessage();
+ loopingCallback = loopingCallback.getNextSendCallback();
+ }
+ }
+
+ private void onSendComplete(Exception e, SendCallback sendCallback,
MessageImpl<?> msg) {
+ long createdAt = (sendCallback instanceof
ProducerImpl.DefaultSendMessageCallback)
+ ? ((DefaultSendMessageCallback) sendCallback).createdAt :
this.createdAt;
+ long latencyNanos = System.nanoTime() - createdAt;
+ ByteBuf payload = msg.getDataBuffer();
+ if (payload == null) {
+ log.error("[{}] [{}] Payload is null when calling
onSendComplete, which is not expected.",
+ topic, producerName);
+ } else {
+ ReferenceCountUtil.safeRelease(payload);
}
- });
- return future;
+ if (e != null) {
+ stats.incrementSendFailed();
+ onSendAcknowledgement(msg, null, e);
+ sendCallback.getFuture().completeExceptionally(e);
+ } else {
+ stats.incrementNumAcksReceived(latencyNanos);
+ onSendAcknowledgement(msg, msg.getMessageId(), null);
+ sendCallback.getFuture().complete(msg.getMessageId());
+ }
+ }
+
+ @Override
+ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
+ nextMsg = msg;
+ nextCallback = scb;
+ }
}
@Override