This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 948b33a3ff0cbb799d2ad39b3f2645ee9fe32c06
Author: pengxiangrui127 <[email protected]>
AuthorDate: Fri Dec 20 19:17:13 2024 +0800

    [Fix][Client] Fix pending message not complete when closeAsync (#23761)
    
    (cherry picked from commit e0a9e4c7b5d3533f2a1e5b7757b180168412c35e)
---
 .../apache/pulsar/client/impl/ProducerImpl.java    |  5 ++-
 .../pulsar/client/impl/ProducerImplTest.java       | 43 ++++++++++++++++++++--
 2 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 10e0ee2ee3d..54d337925dc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1177,11 +1177,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
             cnx.removeProducer(producerId);
+            closeAndClearPendingMessages();
             if (exception == null || !cnx.ctx().channel().isActive()) {
                 // Either we've received the success response for the close 
producer command from the broker, or the
                 // connection did break in the meantime. In any case, the 
producer is gone.
                 log.info("[{}] [{}] Closed Producer", topic, producerName);
-                closeAndClearPendingMessages();
                 closeFuture.complete(null);
             } else {
                 closeFuture.completeExceptionally(exception);
@@ -1193,7 +1193,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return closeFuture;
     }
 
-    private synchronized void closeAndClearPendingMessages() {
+    @VisibleForTesting
+    protected synchronized void closeAndClearPendingMessages() {
         setState(State.Closed);
         client.cleanupProducer(this);
         PulsarClientException ex = new 
PulsarClientException.AlreadyClosedException(
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
index f9df6375939..5f690ead6c5 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
@@ -22,12 +22,19 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
+
 import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import io.netty.util.HashedWheelTimer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
@@ -68,4 +75,34 @@ public class ProducerImplTest {
         verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
     }
 
+    @Test
+    public void testClearPendingMessageWhenCloseAsync() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        Mockito.doReturn(1L).when(client).newProducerId();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setStatsIntervalSeconds(-1);
+        Mockito.doReturn(clientConf).when(client).getConfiguration();
+        Mockito.doReturn(new 
InstrumentProvider(null)).when(client).instrumentProvider();
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon();
+        Mockito.doReturn(connectionPool).when(client).getCnxPool();
+        HashedWheelTimer timer = mock(HashedWheelTimer.class);
+        Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), 
Mockito.anyLong(), Mockito.any());
+        Mockito.doReturn(timer).when(client).timer();
+        ProducerConfigurationData producerConf = new 
ProducerConfigurationData();
+        producerConf.setSendTimeoutMs(-1);
+        ProducerImpl<?> producer = Mockito.spy(new ProducerImpl<>(client, 
"topicName", producerConf, null, 0, null, null, Optional.empty()));
+        
+        // make sure throw exception when send request to broker
+        ClientCnx clientCnx = mock(ClientCnx.class);
+        CompletableFuture<ProducerResponse> tCompletableFuture = new 
CompletableFuture<>();
+        tCompletableFuture.completeExceptionally(new 
PulsarClientException("error"));
+        when(clientCnx.sendRequestWithId(Mockito.any(), 
Mockito.anyLong())).thenReturn(tCompletableFuture);
+        Mockito.doReturn(clientCnx).when(producer).cnx();
+
+        // run closeAsync and verify
+        CompletableFuture<Void> voidCompletableFuture = producer.closeAsync();
+        verify(producer).closeAndClearPendingMessages();
+    }
+
 }

Reply via email to