This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6b7665ac050bbf2dae0e18bc8360838c8f8505d
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Jan 8 22:19:37 2020 -0800

    [pulsar-broker] Clean up closed producer to avoid publish-time  for 
producer (#5988)
    
    * [pulsar-broker] Clean up closed producer to avoid publish-time  for 
producer
    
    * fix test cases
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 +-
 .../pulsar/broker/service/ServerCnxTest.java       | 53 ++++++++++++++++------
 2 files changed, 43 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ec60a8d..2a78508 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1210,14 +1210,15 @@ public class ServerCnx extends PulsarHandler {
         if (!producerFuture.isDone() && producerFuture
                 .completeExceptionally(new IllegalStateException("Closed 
producer before creation was complete"))) {
             // We have received a request to close the producer before it was 
actually completed, we have marked the
-            // producer future as failed and we can tell the client the close 
operation was successful. When the actual
-            // create operation will complete, the new producer will be 
discarded.
+            // producer future as failed and we can tell the client the close 
operation was successful.
             log.info("[{}] Closed producer {} before its creation was 
completed", remoteAddress, producerId);
             ctx.writeAndFlush(Commands.newSuccess(requestId));
+            producers.remove(producerId, producerFuture);
             return;
         } else if (producerFuture.isCompletedExceptionally()) {
             log.info("[{}] Closed producer {} that already failed to be 
created", remoteAddress, producerId);
             ctx.writeAndFlush(Commands.newSuccess(requestId));
+            producers.remove(producerId, producerFuture);
             return;
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 7fdbd3f..1a4231e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -77,6 +77,7 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
+import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
@@ -789,7 +790,7 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap());
         channel.writeInbound(createProducer2);
 
-        // Complete the topic opening
+        // Complete the topic opening: It will make 2nd producer creation 
successful
         openTopicFuture.get().run();
 
         // Close succeeds
@@ -797,13 +798,11 @@ public class ServerCnxTest {
         assertEquals(response.getClass(), CommandSuccess.class);
         assertEquals(((CommandSuccess) response).getRequestId(), 2);
 
-        // 2nd producer fails to create
+        // 2nd producer will be successfully created as topic is open by then
         response = getResponse();
-        assertEquals(response.getClass(), CommandError.class);
-        assertEquals(((CommandError) response).getRequestId(), 3);
+        assertEquals(response.getClass(), CommandProducerSuccess.class);
+        assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);
 
-        // We should not receive response for 1st producer, since it was 
cancelled by the close
-        assertTrue(channel.outboundMessages().isEmpty());
         assertTrue(channel.isActive());
 
         channel.finish();
@@ -927,7 +926,7 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap());
         channel.writeInbound(createProducer2);
 
-        // Now the topic gets opened
+        // Now the topic gets opened.. It will make 2nd producer creation 
successful
         openFailedTopic.get().run();
 
         // Close succeeds
@@ -935,10 +934,10 @@ public class ServerCnxTest {
         assertEquals(response.getClass(), CommandSuccess.class);
         assertEquals(((CommandSuccess) response).getRequestId(), 2);
 
-        // 2nd producer fails
+        // 2nd producer success as topic is opened
         response = getResponse();
-        assertEquals(response.getClass(), CommandError.class);
-        assertEquals(((CommandError) response).getRequestId(), 3);
+        assertEquals(response.getClass(), CommandProducerSuccess.class);
+        assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);
 
         // Wait till the failtopic timeout interval
         Thread.sleep(500);
@@ -946,10 +945,10 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap());
         channel.writeInbound(createProducer3);
 
-        // 3rd producer succeeds
+        // 3rd producer fails because 2nd is already connected
         response = getResponse();
-        assertEquals(response.getClass(), CommandProducerSuccess.class);
-        assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);
+        assertEquals(response.getClass(), CommandError.class);
+        assertEquals(((CommandError) response).getRequestId(), 4);
 
         Thread.sleep(500);
 
@@ -1611,4 +1610,32 @@ public class ServerCnxTest {
 
         channel.finish();
     }
+
+    @Test
+    public void testDelayedClosedProducer() throws Exception {
+        resetChannel();
+        setChannelConnected();
+
+        CompletableFuture<Topic> delayFuture = new CompletableFuture<>();
+        
doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class));
+        // Create producer first time
+        int producerId = 1;
+        ByteBuf clientCommand = Commands.newProducer(successTopicName, 
producerId /* producer id */, 1 /* request id */,
+                "prod-name", Collections.emptyMap());
+        channel.writeInbound(clientCommand);
+
+        ByteBuf closeProducerCmd = Commands.newCloseProducer(producerId, 2);
+        channel.writeInbound(closeProducerCmd);
+
+        Topic topic = mock(Topic.class);
+        
doReturn(CompletableFuture.completedFuture(topic)).when(brokerService).getOrCreateTopic(any(String.class));
+        
doReturn(CompletableFuture.completedFuture(false)).when(topic).hasSchema();
+
+        clientCommand = Commands.newProducer(successTopicName, producerId /* 
producer id */, 1 /* request id */,
+                "prod-name", Collections.emptyMap());
+        channel.writeInbound(clientCommand);
+
+        Object response = getResponse();
+        assertTrue(response instanceof CommandSuccess);
+    }
 }

Reply via email to