This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c6b7665ac050bbf2dae0e18bc8360838c8f8505d Author: Rajan Dhabalia <[email protected]> AuthorDate: Wed Jan 8 22:19:37 2020 -0800 [pulsar-broker] Clean up closed producer to avoid publish-time for producer (#5988) * [pulsar-broker] Clean up closed producer to avoid publish-time for producer * fix test cases --- .../apache/pulsar/broker/service/ServerCnx.java | 5 +- .../pulsar/broker/service/ServerCnxTest.java | 53 ++++++++++++++++------ 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index ec60a8d..2a78508 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1210,14 +1210,15 @@ public class ServerCnx extends PulsarHandler { if (!producerFuture.isDone() && producerFuture .completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) { // We have received a request to close the producer before it was actually completed, we have marked the - // producer future as failed and we can tell the client the close operation was successful. When the actual - // create operation will complete, the new producer will be discarded. + // producer future as failed and we can tell the client the close operation was successful. log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId); ctx.writeAndFlush(Commands.newSuccess(requestId)); + producers.remove(producerId, producerFuture); return; } else if (producerFuture.isCompletedExceptionally()) { log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId); ctx.writeAndFlush(Commands.newSuccess(requestId)); + producers.remove(producerId, producerFuture); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 7fdbd3f..1a4231e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -77,6 +77,7 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.ServerCnx.State; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; import org.apache.pulsar.broker.service.utils.ClientChannelHelper; @@ -789,7 +790,7 @@ public class ServerCnxTest { producerName, Collections.emptyMap()); channel.writeInbound(createProducer2); - // Complete the topic opening + // Complete the topic opening: It will make 2nd producer creation successful openTopicFuture.get().run(); // Close succeeds @@ -797,13 +798,11 @@ public class ServerCnxTest { assertEquals(response.getClass(), CommandSuccess.class); assertEquals(((CommandSuccess) response).getRequestId(), 2); - // 2nd producer fails to create + // 2nd producer will be successfully created as topic is open by then response = getResponse(); - assertEquals(response.getClass(), CommandError.class); - assertEquals(((CommandError) response).getRequestId(), 3); + assertEquals(response.getClass(), CommandProducerSuccess.class); + assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); - // We should not receive response for 1st producer, since it was cancelled by the close - assertTrue(channel.outboundMessages().isEmpty()); assertTrue(channel.isActive()); channel.finish(); @@ -927,7 +926,7 @@ public class ServerCnxTest { producerName, Collections.emptyMap()); channel.writeInbound(createProducer2); - // Now the topic gets opened + // Now the topic gets opened.. It will make 2nd producer creation successful openFailedTopic.get().run(); // Close succeeds @@ -935,10 +934,10 @@ public class ServerCnxTest { assertEquals(response.getClass(), CommandSuccess.class); assertEquals(((CommandSuccess) response).getRequestId(), 2); - // 2nd producer fails + // 2nd producer success as topic is opened response = getResponse(); - assertEquals(response.getClass(), CommandError.class); - assertEquals(((CommandError) response).getRequestId(), 3); + assertEquals(response.getClass(), CommandProducerSuccess.class); + assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); // Wait till the failtopic timeout interval Thread.sleep(500); @@ -946,10 +945,10 @@ public class ServerCnxTest { producerName, Collections.emptyMap()); channel.writeInbound(createProducer3); - // 3rd producer succeeds + // 3rd producer fails because 2nd is already connected response = getResponse(); - assertEquals(response.getClass(), CommandProducerSuccess.class); - assertEquals(((CommandProducerSuccess) response).getRequestId(), 4); + assertEquals(response.getClass(), CommandError.class); + assertEquals(((CommandError) response).getRequestId(), 4); Thread.sleep(500); @@ -1611,4 +1610,32 @@ public class ServerCnxTest { channel.finish(); } + + @Test + public void testDelayedClosedProducer() throws Exception { + resetChannel(); + setChannelConnected(); + + CompletableFuture<Topic> delayFuture = new CompletableFuture<>(); + doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class)); + // Create producer first time + int producerId = 1; + ByteBuf clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */, + "prod-name", Collections.emptyMap()); + channel.writeInbound(clientCommand); + + ByteBuf closeProducerCmd = Commands.newCloseProducer(producerId, 2); + channel.writeInbound(closeProducerCmd); + + Topic topic = mock(Topic.class); + doReturn(CompletableFuture.completedFuture(topic)).when(brokerService).getOrCreateTopic(any(String.class)); + doReturn(CompletableFuture.completedFuture(false)).when(topic).hasSchema(); + + clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */, + "prod-name", Collections.emptyMap()); + channel.writeInbound(clientCommand); + + Object response = getResponse(); + assertTrue(response instanceof CommandSuccess); + } }
