This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 168bbab4ca5d13afda5eaa173dbcf9f923f22427
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Dec 9 13:17:13 2021 -0600

    [Java Client] Send CloseProducer on timeout (#13161)
    
    * [Java Client] Send CloseProducer on timeout
    
    * Fix failed test from ClientErrorsTest
    
    (cherry picked from commit 27d542994703ef40a09363f5de4e92cc3c9d3116)
---
 .../apache/pulsar/client/api/ClientErrorsTest.java | 58 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 12 ++++-
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index f66b198..d7507d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -39,6 +39,7 @@ import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -170,6 +171,7 @@ public class ClientErrorsTest {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
                 .operationTimeout(1, TimeUnit.SECONDS).build();
         final AtomicInteger counter = new AtomicInteger(0);
+        final AtomicInteger closeProducerCounter = new AtomicInteger(0);
 
         mockBrokerService.setHandleProducer((ctx, producer) -> {
             if (counter.incrementAndGet() == 2) {
@@ -182,6 +184,10 @@ public class ClientErrorsTest {
             ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.ServiceNotReady, "msg"));
         });
 
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            closeProducerCounter.incrementAndGet();
+        });
+
         try {
             client.newProducer().topic(topic).create();
             fail("Should have failed");
@@ -189,8 +195,58 @@ public class ClientErrorsTest {
             // we fail even on the retriable error
             assertTrue(e instanceof PulsarClientException);
         }
+        // There is a small race condition here because the producer's timeout 
both fails the client creation
+        // and triggers sending CloseProducer.
+        Awaitility.await().until(() -> closeProducerCounter.get() == 1);
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+    }
+
+    @Test
+    public void testCreatedProducerSendsCloseProducerAfterTimeout() throws 
Exception {
+        producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
+    }
+
+    @Test
+    public void testCreatedPartitionedProducerSendsCloseProducerAfterTimeout() 
throws Exception {
+        
producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
+    }
+
+    private void producerCreatedThenFailsRetryTimeout(String topic) throws 
Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
+                .operationTimeout(1, TimeUnit.SECONDS).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+        final AtomicInteger closeProducerCounter = new AtomicInteger(0);
 
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            int producerCount = producerCounter.incrementAndGet();
+            if (producerCount == 1) {
+                
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"producer1",
+                        SchemaVersion.Empty));
+                // Trigger reconnect
+                
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1));
+            } else if (producerCount != 2) {
+                // Respond to subsequent requests to prevent timeouts
+                
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"producer1",
+                        SchemaVersion.Empty));
+            }
+            // Don't respond to the second Producer command to ensure timeout
+        });
+
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            closeProducerCounter.incrementAndGet();
+            
ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+        });
+
+        // Create producer should succeed then upon closure, it should 
reattempt creation. The first request will
+        // timeout, which triggers CloseProducer. The client might send send 
the third Producer command before the
+        // below assertion, so we pass with 2 or 3.
+        client.newProducer().topic(topic).create();
+        Awaitility.await().until(() -> closeProducerCounter.get() == 1);
+        Awaitility.await().until(() -> producerCounter.get() == 2 || 
producerCounter.get() == 3);
         mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
     }
 
     @Test
@@ -491,7 +547,6 @@ public class ClientErrorsTest {
 
         mockBrokerService.resetHandleProducer();
         mockBrokerService.resetHandleCloseProducer();
-        client.close();
     }
 
     // failed to connect to partition at sending step if a producer which 
connects to broker as lazy-loading mode
@@ -552,7 +607,6 @@ public class ClientErrorsTest {
 
         mockBrokerService.resetHandleProducer();
         mockBrokerService.resetHandleCloseProducer();
-        client.close();
     }
 
     // if a producer which doesn't connect as lazy-loading mode fails to 
connect while creating partitioned producer,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e33fb5f..686fa7d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1483,8 +1483,18 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                         cnx.channel().close();
                         return null;
                     }
+
+                    if (cause instanceof TimeoutException) {
+                        // Creating the producer has timed out. We need to 
ensure the broker closes the producer
+                        // in case it was indeed created, otherwise it might 
prevent new create producer operation,
+                        // since we are not necessarily closing the connection.
+                        long closeRequestId = client.newRequestId();
+                        ByteBuf cmd = Commands.newCloseProducer(producerId, 
closeRequestId);
+                        cnx.sendRequestWithId(cmd, closeRequestId);
+                    }
+
                     log.error("[{}] [{}] Failed to create producer: {}", 
topic, producerName, cause.getMessage());
-                    // Close the producer since topic does not exists.
+                    // Close the producer since topic does not exist.
                     if (cause instanceof 
PulsarClientException.TopicDoesNotExistException) {
                         closeAsync().whenComplete((v, ex) -> {
                             if (ex != null) {

Reply via email to