This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ab1b5c00565 [fix] [client] Fix memory leak when publishing encountered
a corner case error (#23738)
ab1b5c00565 is described below
commit ab1b5c00565adfe877719130127fc23ac9c5a0c1
Author: fengyubiao <[email protected]>
AuthorDate: Fri Dec 20 10:59:04 2024 +0800
[fix] [client] Fix memory leak when publishing encountered a corner case
error (#23738)
Co-authored-by: Yunze Xu <[email protected]>
---
.../pulsar/client/impl/ProducerMemoryLeakTest.java | 364 +++++++++++++++++++++
.../client/impl/BatchMessageContainerImpl.java | 4 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 54 ++-
.../pulsar/client/impl/ProducerInterceptors.java | 12 +-
4 files changed, 417 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
new file mode 100644
index 00000000000..dcdfd136476
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class ProducerMemoryLeakTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSendQueueIsFull() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+ ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING)
+ .blockIfQueueFull(false).maxPendingMessages(1)
+ .enableBatching(true).topic(topicName).create();
+ List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new
ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ msgBuilderList.add(newMessage(producer));
+ }
+
+ CompletableFuture latestSendFuture = null;
+ for (MsgPayloadTouchableMessageBuilder<String> msgBuilder:
msgBuilderList) {
+ latestSendFuture = msgBuilder.value("msg-1").sendAsync();
+ }
+ try{
+ latestSendFuture.join();
+ } catch (Exception ex) {
+ // Ignore the error PulsarClientException$ProducerQueueIsFullError.
+ assertTrue(FutureUtil.unwrapCompletionException(ex)
+ instanceof PulsarClientException.ProducerQueueIsFullError);
+ }
+
+ // Verify: ref is expected.
+ producer.close();
+ for (int i = 0; i < msgBuilderList.size(); i++) {
+ MsgPayloadTouchableMessageBuilder<String> msgBuilder =
msgBuilderList.get(i);
+ assertEquals(msgBuilder.payload.refCnt(), 1);
+ msgBuilder.release();
+ assertEquals(msgBuilder.payload.refCnt(), 0);
+ }
+ admin.topics().delete(topicName);
+ }
+
+ /**
+ * The content size of msg(value is "msg-1") will be "5".
+ * Then provides two param: 1 and 5.
+ * 1: reach the limitation before adding the message metadata.
+ * 2: reach the limitation after adding the message metadata.
+ */
+ @DataProvider(name = "maxMessageSizeAndCompressions")
+ public Object[][] maxMessageSizeAndCompressions(){
+ return new Object[][] {
+ {1, CompressionType.NONE},
+ {5, CompressionType.NONE},
+ {1, CompressionType.LZ4},
+ {6, CompressionType.LZ4}
+ };
+ }
+
+ @Test(dataProvider = "maxMessageSizeAndCompressions")
+ public void testSendMessageSizeExceeded(int maxMessageSize,
CompressionType compressionType) throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+ ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+ .compressionType(compressionType)
+ .enableBatching(false)
+ .create();
+ producer.getConnectionHandler().setMaxMessageSize(maxMessageSize);
+ MsgPayloadTouchableMessageBuilder<String> msgBuilder =
newMessage(producer);
+ /**
+ * Mock an error: reached max message size, see more details {@link
#maxMessageSizeAndCompressions()}.
+ */
+ try (MockedStatic<ByteBufPair> theMock =
mockStatic(ByteBufPair.class)) {
+ List<ByteBufPair> generatedByteBufPairs =
Collections.synchronizedList(new ArrayList<>());
+ theMock.when(() -> ByteBufPair.get(any(ByteBuf.class),
any(ByteBuf.class))).then(invocation -> {
+ ByteBufPair byteBufPair = (ByteBufPair)
invocation.callRealMethod();
+ generatedByteBufPairs.add(byteBufPair);
+ byteBufPair.retain();
+ return byteBufPair;
+ });
+ try {
+ msgBuilder.value("msg-1").send();
+ fail("expected an error that reached the max message size");
+ } catch (Exception ex) {
+ assertTrue(FutureUtil.unwrapCompletionException(ex)
+ instanceof
PulsarClientException.InvalidMessageException);
+ }
+
+ // Verify: message payload has been released.
+ // Since "MsgPayloadTouchableMessageBuilder" has called
"buffer.retain" once, "refCnt()" should be "1".
+ producer.close();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getPendingQueueSize(), 0);
+ });
+ // Verify: ByteBufPair generated for Pulsar Command.
+ if (maxMessageSize == 1) {
+ assertEquals(generatedByteBufPairs.size(),0);
+ } else {
+ assertEquals(generatedByteBufPairs.size(),1);
+ if (compressionType == CompressionType.NONE) {
+ assertEquals(msgBuilder.payload.refCnt(), 2);
+ } else {
+ assertEquals(msgBuilder.payload.refCnt(), 1);
+ }
+ for (ByteBufPair byteBufPair : generatedByteBufPairs) {
+ assertEquals(byteBufPair.refCnt(), 1);
+ byteBufPair.release();
+ assertEquals(byteBufPair.refCnt(), 0);
+ }
+ }
+ // Verify: message.payload
+ assertEquals(msgBuilder.payload.refCnt(), 1);
+ msgBuilder.release();
+ assertEquals(msgBuilder.payload.refCnt(), 0);
+ }
+
+ // cleanup.
+ assertEquals(msgBuilder.payload.refCnt(), 0);
+ admin.topics().delete(topicName);
+ }
+
+ /**
+ * The content size of msg(value is "msg-1") will be "5".
+ * Then provides two param: 1 and 5.
+ * 1: Less than the limitation when adding the message into the
batch-container.
+ * 3: Less than the limitation when building batched messages payload.
+ * 2: Equals the limitation when building batched messages payload.
+ */
+ @DataProvider(name = "maxMessageSizes")
+ public Object[][] maxMessageSizes(){
+ return new Object[][] {
+ {1},
+ {3},
+ {26}
+ };
+ }
+
+ @Test(dataProvider = "maxMessageSizes")
+ public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+ ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+ .enableBatching(true)
+ .compressionType(CompressionType.NONE)
+ .create();
+ final ClientCnx cnx = producer.getClientCnx();
+ producer.getConnectionHandler().setMaxMessageSize(maxMessageSize);
+ MsgPayloadTouchableMessageBuilder<String> msgBuilder1 =
newMessage(producer);
+ MsgPayloadTouchableMessageBuilder<String> msgBuilder2 =
newMessage(producer);
+ /**
+ * Mock an error: reached max message size. see more detail {@link
#maxMessageSizes()}.
+ */
+ msgBuilder1.value("msg-1").sendAsync();
+ try {
+ msgBuilder2.value("msg-1").send();
+ if (maxMessageSize != 26) {
+ fail("expected an error that reached the max message size");
+ }
+ } catch (Exception ex) {
+ assertTrue(FutureUtil.unwrapCompletionException(ex)
+ instanceof PulsarClientException.InvalidMessageException);
+ }
+
+ // Verify: message payload has been released.
+ // Since "MsgPayloadTouchableMessageBuilder" has called
"buffer.retain" once, "refCnt()" should be "1".
+ producer.close();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getPendingQueueSize(), 0);
+ });
+ assertEquals(msgBuilder1.payload.refCnt(), 1);
+ assertEquals(msgBuilder2.payload.refCnt(), 1);
+
+ // cleanup.
+ cnx.ctx().close();
+ msgBuilder1.release();
+ msgBuilder2.release();
+ assertEquals(msgBuilder1.payload.refCnt(), 0);
+ assertEquals(msgBuilder2.payload.refCnt(), 0);
+ admin.topics().delete(topicName);
+ }
+
+ @Test
+ public void testSendAfterClosedProducer() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+ ProducerImpl<String> producer =
+ (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ // Publish after the producer was closed.
+ MsgPayloadTouchableMessageBuilder<String> msgBuilder =
newMessage(producer);
+ producer.close();
+ try {
+ msgBuilder.value("msg-1").send();
+ fail("expected an error that the producer has closed");
+ } catch (Exception ex) {
+ assertTrue(FutureUtil.unwrapCompletionException(ex)
+ instanceof PulsarClientException.AlreadyClosedException);
+ }
+
+ // Verify: message payload has been released.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getPendingQueueSize(), 0);
+ });
+ assertEquals(msgBuilder.payload.refCnt(), 1);
+
+ // cleanup.
+ msgBuilder.release();
+ assertEquals(msgBuilder.payload.refCnt(), 0);
+ admin.topics().delete(topicName);
+ }
+
+ @DataProvider
+ public Object[][] failedInterceptAt() {
+ return new Object[][]{
+ {"close"},
+ {"eligible"},
+ {"beforeSend"},
+ {"onSendAcknowledgement"},
+ };
+ }
+
+ @Test(dataProvider = "failedInterceptAt")
+ public void testInterceptorError(String method) throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+ ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName)
+ .intercept(
+
+ new ProducerInterceptor() {
+ @Override
+ public void close() {
+ if (method.equals("close")) {
+ throw new RuntimeException("Mocked error");
+ }
+ }
+
+ @Override
+ public boolean eligible(Message message) {
+ if (method.equals("eligible")) {
+ throw new RuntimeException("Mocked error");
+ }
+ return false;
+ }
+
+ @Override
+ public Message beforeSend(Producer producer, Message
message) {
+ if (method.equals("beforeSend")) {
+ throw new RuntimeException("Mocked error");
+ }
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer producer,
Message message, MessageId msgId,
+ Throwable exception) {
+ if (method.equals("onSendAcknowledgement")) {
+ throw new RuntimeException("Mocked error");
+ }
+
+ }
+ }).create();
+
+ MsgPayloadTouchableMessageBuilder<String> msgBuilder =
newMessage(producer);
+ try {
+ msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS);
+ // It may throw error.
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("Mocked"));
+ }
+
+ // Verify: message payload has been released.
+ producer.close();
+ assertEquals(msgBuilder.payload.refCnt(), 1);
+
+ // cleanup.
+ msgBuilder.release();
+ assertEquals(msgBuilder.payload.refCnt(), 0);
+ admin.topics().delete(topicName);
+ }
+
+ private <T> MsgPayloadTouchableMessageBuilder<T>
newMessage(ProducerImpl<T> producer){
+ return new MsgPayloadTouchableMessageBuilder<T>(producer,
producer.schema);
+ }
+
+ private static class MsgPayloadTouchableMessageBuilder<T> extends
TypedMessageBuilderImpl {
+
+ public volatile ByteBuf payload;
+
+ public <T> MsgPayloadTouchableMessageBuilder(ProducerBase producer,
Schema<T> schema) {
+ super(producer, schema);
+ }
+
+ @Override
+ public Message<T> getMessage() {
+ MessageImpl<T> msg = (MessageImpl<T>) super.getMessage();
+ payload = msg.getPayload();
+ // Retain the msg to avoid it be reused by other task.
+ payload.retain();
+ return msg;
+ }
+
+ public void release() {
+ payload.release();
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 44f1fb27465..7262cfd11e0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -290,8 +290,8 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
messages.forEach(msg -> producer.client.getMemoryLimitController()
.releaseMemory(msg.getUncompressedSize()));
producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes);
- discard(new PulsarClientException.InvalidMessageException(
- "Message size is bigger than " + getMaxMessageSize() + "
bytes"));
+ discard(new PulsarClientException.InvalidMessageException("Message
size "
+ + encryptedPayload.readableBytes() + " is bigger than " +
getMaxMessageSize() + " bytes"));
return null;
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b686252b58a..10e0ee2ee3d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -32,6 +32,9 @@ import static
org.apache.pulsar.common.protocol.Commands.readChecksum;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
+import io.netty.channel.ChannelPromise;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
@@ -483,19 +486,46 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return compressedPayload;
}
+ /**
+ * Note on ByteBuf Release Behavior.
+ *
+ * <p>If you have a customized callback, please ignore the note below.</p>
+ *
+ * <p>When using the default callback, please confirm that the {@code
refCnt()} value of the {@code message}
+ * (as returned by {@link MessageImpl#getDataBuffer}) is {@code 2} when
you call this method. This is because
+ * the {@code ByteBuf} will be released twice under the following
conditions:</p>
+ *
+ * <ul>
+ * <li><b>Batch Messaging Enabled:</b>
+ * <ol>
+ * <li>Release 1: When the message is pushed into the batched
message queue (see {@link #doBatchSendAndAdd}).
+ * </li>
+ * <li>Release 2: In the method {@link
SendCallback#sendComplete(Throwable, OpSendMsgStats)}.</li>
+ * </ol>
+ * </li>
+ * <li><b>Single Message (Batch Messaging Disabled):</b>
+ * <ol>
+ * <li>Release 1: When the message is written out by
+ * {@link ChannelOutboundHandler#write(ChannelHandlerContext,
Object, ChannelPromise)}.</li>
+ * <li>Release 2: In the method {@link
SendCallback#sendComplete(Throwable, OpSendMsgStats)}.</li>
+ * </ol>
+ * </li>
+ * </ul>
+ */
public void sendAsync(Message<?> message, SendCallback callback) {
checkArgument(message instanceof MessageImpl);
-
- if (!isValidProducerState(callback, message.getSequenceId())) {
- return;
- }
-
MessageImpl<?> msg = (MessageImpl<?>) message;
MessageMetadata msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
final int uncompressedSize = payload.readableBytes();
+ if (!isValidProducerState(callback, message.getSequenceId())) {
+ payload.release();
+ return;
+ }
+
if (!canEnqueueRequest(callback, message.getSequenceId(),
uncompressedSize)) {
+ payload.release();
return;
}
@@ -573,6 +603,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
for (int i = 0; i < (totalChunks - 1); i++) {
if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
message.getSequenceId(),
0 /* The memory was already reserved */)) {
+ compressedPayload.release();
client.getMemoryLimitController().releaseMemory(uncompressedSize);
semaphoreRelease(i + 1);
return;
@@ -603,6 +634,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
if (chunkId > 0 && conf.isBlockIfQueueFull() &&
!canEnqueueRequest(callback,
message.getSequenceId(), 0 /* The memory was already
reserved */)) {
+ compressedPayload.release();
client.getMemoryLimitController().releaseMemory(uncompressedSize -
readStartIndex);
semaphoreRelease(totalChunks - chunkId);
return;
@@ -723,10 +755,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
} else {
// handle boundary cases where message being added
would exceed
// batch size and/or max message size
- boolean isBatchFull = batchMessageContainer.add(msg,
callback);
- lastSendFuture = callback.getFuture();
- payload.release();
- triggerSendIfFullOrScheduleFlush(isBatchFull);
+ try {
+ boolean isBatchFull =
batchMessageContainer.add(msg, callback);
+ lastSendFuture = callback.getFuture();
+ triggerSendIfFullOrScheduleFlush(isBatchFull);
+ } finally {
+ payload.release();
+ }
}
isLastSequenceIdPotentialDuplicated = false;
}
@@ -2304,6 +2339,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
batchMessageAndSend(false);
}
if (isMessageSizeExceeded(op)) {
+ op.cmd.release();
return;
}
pendingMessages.add(op);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
index 97f16c37b5d..38492ceae84 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -60,10 +60,10 @@ public class ProducerInterceptors implements Closeable {
public Message beforeSend(Producer producer, Message message) {
Message interceptorMessage = message;
for (ProducerInterceptor interceptor : interceptors) {
- if (!interceptor.eligible(message)) {
- continue;
- }
try {
+ if (!interceptor.eligible(message)) {
+ continue;
+ }
interceptorMessage = interceptor.beforeSend(producer,
interceptorMessage);
} catch (Throwable e) {
if (producer != null) {
@@ -93,10 +93,10 @@ public class ProducerInterceptors implements Closeable {
*/
public void onSendAcknowledgement(Producer producer, Message message,
MessageId msgId, Throwable exception) {
for (ProducerInterceptor interceptor : interceptors) {
- if (!interceptor.eligible(message)) {
- continue;
- }
try {
+ if (!interceptor.eligible(message)) {
+ continue;
+ }
interceptor.onSendAcknowledgement(producer, message, msgId,
exception);
} catch (Throwable e) {
log.warn("Error executing interceptor onSendAcknowledgement
callback ", e);