This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1703affb54cc2443dcb1bc185c5b2af7859710af Author: Shoothzj <[email protected]> AuthorDate: Tue Sep 7 23:41:06 2021 +0800 [Issue 11936] forget to call SendCallback on producer close (#11939) * forget to call SendCallback on producer close * add unit tests * add unit tests (cherry picked from commit d494c43cc5cb249a7d139a0ee1a600103805bb85) --- .../pulsar/client/impl/ProducerCloseTest.java | 82 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 31 ++++---- 2 files changed, 97 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java new file mode 100644 index 0000000..0c4df15 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.Cleanup; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.api.proto.CommandSuccess; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +@Test(groups = "broker-impl") +public class ProducerCloseTest extends ProducerConsumerBase { + + @Override + @BeforeMethod + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 10_000) + public void testProducerCloseCallback() throws Exception { + initClient(); + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerClose") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(false) + .create(); + final TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage(); + final TypedMessageBuilder<byte[]> value = messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8)); + producer.getClientCnx().channel().config().setAutoRead(false); + final CompletableFuture<MessageId> completableFuture = value.sendAsync(); + producer.closeAsync(); + final CommandSuccess commandSuccess = new CommandSuccess(); + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + commandSuccess.setRequestId(clientImpl.newRequestId() -1); + producer.getClientCnx().handleSuccess(commandSuccess); + Thread.sleep(3000); + Assert.assertEquals(completableFuture.isDone(), true); + } + + private void initClient() throws PulsarClientException { + pulsarClient = PulsarClient.builder(). + serviceUrl(lookupUrl.toString()) + .build(); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e531345..84062fb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -871,16 +871,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne synchronized (this) { setState(State.Closed); client.cleanupProducer(this); - PulsarClientException ex = new PulsarClientException.AlreadyClosedException( - format("The producer %s of the topic %s was already closed when closing the producers", - producerName, topic)); - pendingMessages.forEach(msg -> { - client.getMemoryLimitController().releaseMemory(msg.uncompressedSize); - msg.sendComplete(ex); - msg.cmd.release(); - msg.recycle(); - }); - pendingMessages.clear(); + clearPendingMessagesWhenClose(); } return CompletableFuture.completedFuture(null); @@ -898,12 +889,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne synchronized (ProducerImpl.this) { log.info("[{}] [{}] Closed Producer", topic, producerName); setState(State.Closed); - pendingMessages.forEach(msg -> { - client.getMemoryLimitController().releaseMemory(msg.uncompressedSize); - msg.cmd.release(); - msg.recycle(); - }); - pendingMessages.clear(); + clearPendingMessagesWhenClose(); } closeFuture.complete(null); @@ -918,6 +904,19 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return closeFuture; } + private void clearPendingMessagesWhenClose() { + PulsarClientException ex = new PulsarClientException.AlreadyClosedException( + format("The producer %s of the topic %s was already closed when closing the producers", + producerName, topic)); + pendingMessages.forEach(msg -> { + client.getMemoryLimitController().releaseMemory(msg.uncompressedSize); + msg.sendComplete(ex); + msg.cmd.release(); + msg.recycle(); + }); + pendingMessages.clear(); + } + @Override public boolean isConnected() { return connectionHandler.cnx() != null && (getState() == State.Ready);
