BewareMyPower commented on code in PR #23738: URL: https://github.com/apache/pulsar/pull/23738#discussion_r1889607083
########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; Review Comment: I think it can just initialize `indexCalledSend` with ```java final int indexCalledSend = msgBuilderList.size() - 1; ``` or just replace the following for loops with: ```java for (int i = msgBuilderList.size() - 1; i > -1; i--) { ``` ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } Review Comment: Could you simplify these two for loops into one? ```java for (int i = indexCalledSend; i > -1; i--) { MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); assertEquals(msgBuilder.payload.refCnt(), 1); msgBuilder.release(); assertEquals(msgBuilder.payload.refCnt(), 0); } ``` BTW, I don't understand why do you use a reversed for loop rather than a simple `for (int i = 0; i < msgBuilderList.size(); i++)` ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java: ########## @@ -45,10 +46,10 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0); - private final transient ProducerBase<?> producer; - private final transient MessageMetadata msgMetadata = new MessageMetadata(); - private final transient Schema<T> schema; - private transient ByteBuffer content; + protected final transient ProducerBase<?> producer; + protected final transient MessageMetadata msgMetadata = new MessageMetadata(); + protected final transient Schema<T> schema; + protected transient ByteBuffer content; Review Comment: You don't need to change these access modifiers, as well as the `beforeSend` method. You can reuse the `getMessage` method when you want to mock it in `MsgPayloadTouchableMessageBuilder`. ```java @Override public Message<T> getMessage() { final var msg = (MessageImpl<T>) super.getMessage(); payload = msg.getPayload().retain(); if (msgMocker == null) { return msg; } else { return msgMocker.apply(msg); } } ``` ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {5, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + /** + * Mock an error: reached max message size, see more details {@link #maxMessageSizeAndCompressions()}. + */ + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that reached the max message size"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: Less than the limitation when adding the message into the batch-container. + * 3: Less than the limitation when building batched messages payload. + * 2: Equals the limitation when building batched messages payload. + */ + @DataProvider(name = "maxMessageSizes") + public Object[][] maxMessageSizes(){ + return new Object[][] { + {1}, + {3}, + {26} + }; + } + + @Test(dataProvider = "maxMessageSizes") + public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = newMessage(producer); + MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = newMessage(producer); + /** + * Mock an error: reached max message size. see more detail {@link #maxMessageSizes()}. + */ + msgBuilder1.value("msg-1").sendAsync(); + try { + msgBuilder2.value("msg-1").send(); + if (maxMessageSize != 26) { + fail("expected an error that reached the max message size"); + } + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder1.release(); + msgBuilder2.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @Test + public void testSendAfterClosedProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = + (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + producer.close(); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that the producer has closed"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.AlreadyClosedException); + } + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + //admin.topics().deleteAsync(topicName).get(10, TimeUnit.SECONDS); TODO fix bug. + } + + @DataProvider + public Object[][] failedInterceptAt() { + return new Object[][]{ + {"eligible"}, + {"beforeSend"}, + {"onSendAcknowledgement"}, + }; + } + + @Test(dataProvider = "failedInterceptAt") + public void testInterceptorError(String method) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .intercept( + + new ProducerInterceptor() { + @Override + public void close() { + if (method.equals("close")) { + throw new RuntimeException("Mocked error"); + } + } + + @Override + public boolean eligible(Message message) { + if (method.equals("eligible")) { + throw new RuntimeException("Mocked error"); + } + return false; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + if (method.equals("beforeSend")) { + throw new RuntimeException("Mocked error"); + } + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + if (method.equals("onSendAcknowledgement")) { + throw new RuntimeException("Mocked error"); + } + + } + }).create(); + + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + try { + msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS); + // It may throw error. + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Mocked")); + } + + // Verify: message payload has been released. + producer.close(); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + private <T> MsgPayloadTouchableMessageBuilder<T> newMessage(ProducerImpl<T> producer){ + Schema<T> schema = getInternalState(producer, "schema"); + return new MsgPayloadTouchableMessageBuilder<T>(producer, schema); + } + + private static class MsgPayloadTouchableMessageBuilder<T> extends TypedMessageBuilderImpl { + + public volatile ByteBuf payload; + + private volatile Function<MessageImpl<T>, MessageImpl<T>> msgMocker; Review Comment: This field is never used? ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {5, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + /** + * Mock an error: reached max message size, see more details {@link #maxMessageSizeAndCompressions()}. + */ + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that reached the max message size"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: Less than the limitation when adding the message into the batch-container. + * 3: Less than the limitation when building batched messages payload. + * 2: Equals the limitation when building batched messages payload. + */ + @DataProvider(name = "maxMessageSizes") + public Object[][] maxMessageSizes(){ + return new Object[][] { + {1}, + {3}, + {26} + }; + } + + @Test(dataProvider = "maxMessageSizes") + public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = newMessage(producer); + MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = newMessage(producer); + /** + * Mock an error: reached max message size. see more detail {@link #maxMessageSizes()}. + */ + msgBuilder1.value("msg-1").sendAsync(); + try { + msgBuilder2.value("msg-1").send(); + if (maxMessageSize != 26) { + fail("expected an error that reached the max message size"); + } + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder1.release(); + msgBuilder2.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @Test + public void testSendAfterClosedProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = + (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + producer.close(); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that the producer has closed"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.AlreadyClosedException); + } + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + //admin.topics().deleteAsync(topicName).get(10, TimeUnit.SECONDS); TODO fix bug. Review Comment: Could you open an issue instead of leaving a comment? And I don't think it's a bug. It's expected that the 2nd `delete` will fail if the topic has already been deleted. ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { Review Comment: Actually it never goes to this for loop because `indexCalledSend` is 99 ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {5, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .compressionType(CompressionType.NONE) Review Comment: ```suggestion ``` This config overwrites the previous `compressionType` config ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + Review Comment: ```suggestion ``` Disabling the topic level policies seems unnecessary? ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {5, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + /** + * Mock an error: reached max message size, see more details {@link #maxMessageSizeAndCompressions()}. + */ + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that reached the max message size"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: Less than the limitation when adding the message into the batch-container. + * 3: Less than the limitation when building batched messages payload. + * 2: Equals the limitation when building batched messages payload. + */ + @DataProvider(name = "maxMessageSizes") + public Object[][] maxMessageSizes(){ + return new Object[][] { + {1}, + {3}, + {26} + }; + } + + @Test(dataProvider = "maxMessageSizes") + public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = newMessage(producer); + MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = newMessage(producer); + /** + * Mock an error: reached max message size. see more detail {@link #maxMessageSizes()}. + */ + msgBuilder1.value("msg-1").sendAsync(); + try { + msgBuilder2.value("msg-1").send(); + if (maxMessageSize != 26) { + fail("expected an error that reached the max message size"); + } + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder1.release(); + msgBuilder2.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @Test + public void testSendAfterClosedProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = + (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + producer.close(); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that the producer has closed"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.AlreadyClosedException); + } + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + //admin.topics().deleteAsync(topicName).get(10, TimeUnit.SECONDS); TODO fix bug. + } + + @DataProvider + public Object[][] failedInterceptAt() { + return new Object[][]{ + {"eligible"}, + {"beforeSend"}, + {"onSendAcknowledgement"}, + }; + } + + @Test(dataProvider = "failedInterceptAt") + public void testInterceptorError(String method) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .intercept( + + new ProducerInterceptor() { + @Override + public void close() { + if (method.equals("close")) { + throw new RuntimeException("Mocked error"); + } + } + + @Override + public boolean eligible(Message message) { + if (method.equals("eligible")) { + throw new RuntimeException("Mocked error"); + } + return false; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + if (method.equals("beforeSend")) { + throw new RuntimeException("Mocked error"); + } + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + if (method.equals("onSendAcknowledgement")) { + throw new RuntimeException("Mocked error"); + } + + } + }).create(); + + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + try { + msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS); + // It may throw error. + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Mocked")); + } + + // Verify: message payload has been released. + producer.close(); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + private <T> MsgPayloadTouchableMessageBuilder<T> newMessage(ProducerImpl<T> producer){ + Schema<T> schema = getInternalState(producer, "schema"); + return new MsgPayloadTouchableMessageBuilder<T>(producer, schema); + } + + private static class MsgPayloadTouchableMessageBuilder<T> extends TypedMessageBuilderImpl { + + public volatile ByteBuf payload; + + private volatile Function<MessageImpl<T>, MessageImpl<T>> msgMocker; + + public <T> MsgPayloadTouchableMessageBuilder(ProducerBase producer, Schema<T> schema) { + super(producer, schema); + } + public void setMsgMocker(Function<MessageImpl<T>, MessageImpl<T>> msgMocker) { + this.msgMocker = msgMocker; + } Review Comment: ```suggestion ``` Remove unused method ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {5, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + /** + * Mock an error: reached max message size, see more details {@link #maxMessageSizeAndCompressions()}. + */ + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that reached the max message size"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: Less than the limitation when adding the message into the batch-container. + * 3: Less than the limitation when building batched messages payload. + * 2: Equals the limitation when building batched messages payload. + */ + @DataProvider(name = "maxMessageSizes") + public Object[][] maxMessageSizes(){ + return new Object[][] { + {1}, + {3}, + {26} + }; + } + + @Test(dataProvider = "maxMessageSizes") + public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = newMessage(producer); + MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = newMessage(producer); + /** + * Mock an error: reached max message size. see more detail {@link #maxMessageSizes()}. + */ + msgBuilder1.value("msg-1").sendAsync(); + try { + msgBuilder2.value("msg-1").send(); + if (maxMessageSize != 26) { + fail("expected an error that reached the max message size"); + } + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder1.release(); + msgBuilder2.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @Test + public void testSendAfterClosedProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = + (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + producer.close(); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that the producer has closed"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.AlreadyClosedException); + } + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + //admin.topics().deleteAsync(topicName).get(10, TimeUnit.SECONDS); TODO fix bug. + } + + @DataProvider + public Object[][] failedInterceptAt() { + return new Object[][]{ + {"eligible"}, + {"beforeSend"}, + {"onSendAcknowledgement"}, + }; + } + + @Test(dataProvider = "failedInterceptAt") + public void testInterceptorError(String method) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .intercept( + + new ProducerInterceptor() { + @Override + public void close() { + if (method.equals("close")) { + throw new RuntimeException("Mocked error"); + } + } + + @Override + public boolean eligible(Message message) { + if (method.equals("eligible")) { + throw new RuntimeException("Mocked error"); + } + return false; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + if (method.equals("beforeSend")) { + throw new RuntimeException("Mocked error"); + } + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + if (method.equals("onSendAcknowledgement")) { + throw new RuntimeException("Mocked error"); + } + + } + }).create(); + + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + try { + msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS); + // It may throw error. + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Mocked")); + } + + // Verify: message payload has been released. + producer.close(); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + private <T> MsgPayloadTouchableMessageBuilder<T> newMessage(ProducerImpl<T> producer){ + Schema<T> schema = getInternalState(producer, "schema"); Review Comment: ```suggestion Schema<T> schema = producer.schema; ``` ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {5, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + /** + * Mock an error: reached max message size, see more details {@link #maxMessageSizeAndCompressions()}. + */ + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that reached the max message size"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: Less than the limitation when adding the message into the batch-container. + * 3: Less than the limitation when building batched messages payload. + * 2: Equals the limitation when building batched messages payload. + */ + @DataProvider(name = "maxMessageSizes") + public Object[][] maxMessageSizes(){ + return new Object[][] { + {1}, + {3}, + {26} + }; + } + + @Test(dataProvider = "maxMessageSizes") + public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = newMessage(producer); + MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = newMessage(producer); + /** + * Mock an error: reached max message size. see more detail {@link #maxMessageSizes()}. + */ + msgBuilder1.value("msg-1").sendAsync(); + try { + msgBuilder2.value("msg-1").send(); + if (maxMessageSize != 26) { + fail("expected an error that reached the max message size"); + } + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder1.release(); + msgBuilder2.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @Test + public void testSendAfterClosedProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = + (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + producer.close(); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that the producer has closed"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.AlreadyClosedException); + } + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + //admin.topics().deleteAsync(topicName).get(10, TimeUnit.SECONDS); TODO fix bug. + } + + @DataProvider + public Object[][] failedInterceptAt() { + return new Object[][]{ + {"eligible"}, + {"beforeSend"}, + {"onSendAcknowledgement"}, + }; + } + + @Test(dataProvider = "failedInterceptAt") + public void testInterceptorError(String method) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .intercept( + + new ProducerInterceptor() { + @Override + public void close() { + if (method.equals("close")) { + throw new RuntimeException("Mocked error"); + } + } + + @Override + public boolean eligible(Message message) { + if (method.equals("eligible")) { + throw new RuntimeException("Mocked error"); + } + return false; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + if (method.equals("beforeSend")) { + throw new RuntimeException("Mocked error"); + } + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + if (method.equals("onSendAcknowledgement")) { + throw new RuntimeException("Mocked error"); + } + + } + }).create(); + + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + try { + msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS); + // It may throw error. + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Mocked")); + } + + // Verify: message payload has been released. + producer.close(); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + private <T> MsgPayloadTouchableMessageBuilder<T> newMessage(ProducerImpl<T> producer){ + Schema<T> schema = getInternalState(producer, "schema"); + return new MsgPayloadTouchableMessageBuilder<T>(producer, schema); + } + + private static class MsgPayloadTouchableMessageBuilder<T> extends TypedMessageBuilderImpl { + + public volatile ByteBuf payload; + + private volatile Function<MessageImpl<T>, MessageImpl<T>> msgMocker; + + public <T> MsgPayloadTouchableMessageBuilder(ProducerBase producer, Schema<T> schema) { + super(producer, schema); + } + public void setMsgMocker(Function<MessageImpl<T>, MessageImpl<T>> msgMocker) { + this.msgMocker = msgMocker; + } + + @Override + public Message<T> getMessage() { + beforeSend(); + MessageImpl<T> msg = MessageImpl.create(msgMetadata, content, schema, + producer != null ? producer.getTopic() : null); + payload = getInternalState(msg, "payload"); Review Comment: Whenever possible, don't use reflection unless it's hard to change the visibility of the private field. In this case, `MessageImpl` already provides a `getPayload` method visible for testing. Even it does not provide, it's better to add one rather than using reflection. ########## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.awaitility.reflect.WhiteboxImpl.getInternalState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(false); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + int indexCalledSend = 0; + List<CompletableFuture> sendFutureList = new ArrayList<>(); + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + sendFutureList.add(msgBuilder.value("msg-1").sendAsync()); + if (indexCalledSend != 99) { + indexCalledSend++; + } + } + try{ + sendFutureList.get(sendFutureList.size() - 1).join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + producer.close(); + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + } + + // cleanup. + for (int i = indexCalledSend; i > -1; i--) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + for (int i = indexCalledSend + 1; i < 100; i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {5, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + /** + * Mock an error: reached max message size, see more details {@link #maxMessageSizeAndCompressions()}. + */ + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that reached the max message size"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: Less than the limitation when adding the message into the batch-container. + * 3: Less than the limitation when building batched messages payload. + * 2: Equals the limitation when building batched messages payload. + */ + @DataProvider(name = "maxMessageSizes") + public Object[][] maxMessageSizes(){ + return new Object[][] { + {1}, + {3}, + {26} + }; + } + + @Test(dataProvider = "maxMessageSizes") + public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = newMessage(producer); + MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = newMessage(producer); + /** + * Mock an error: reached max message size. see more detail {@link #maxMessageSizes()}. + */ + msgBuilder1.value("msg-1").sendAsync(); + try { + msgBuilder2.value("msg-1").send(); + if (maxMessageSize != 26) { + fail("expected an error that reached the max message size"); + } + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder1.release(); + msgBuilder2.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @Test + public void testSendAfterClosedProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = + (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + producer.close(); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that the producer has closed"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.AlreadyClosedException); + } + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + //admin.topics().deleteAsync(topicName).get(10, TimeUnit.SECONDS); TODO fix bug. + } + + @DataProvider + public Object[][] failedInterceptAt() { + return new Object[][]{ + {"eligible"}, + {"beforeSend"}, + {"onSendAcknowledgement"}, Review Comment: Should you add `{"close"}` as well? BTW, the indent here seems strange that it only has two blanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org