This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 479f067  [pulsar-broker] clean up producer/consumer result from 
connection-cache (#4145)
479f067 is described below

commit 479f067b9dc58c828560feec0f0061b50a20ef1e
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Apr 26 15:56:36 2019 -0700

    [pulsar-broker] clean up producer/consumer result from connection-cache 
(#4145)
---
 .../main/java/org/apache/pulsar/broker/service/ServerCnx.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aebcb30..d2a5268 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -660,8 +660,13 @@ public class ServerCnx extends PulsarHandler {
                                 // creation request either complete or fails.
                                 log.warn("[{}][{}][{}] Consumer with id {} is 
already present on the connection", remoteAddress,
                                         topicName, subscriptionName, 
consumerId);
-                                ServerError error = 
!existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
-                                        : getErrorCode(existingConsumerFuture);
+                                ServerError error = null;
+                                if(!existingConsumerFuture.isDone()) {
+                                    error = ServerError.ServiceNotReady;
+                                }else {
+                                    error = 
getErrorCode(existingConsumerFuture);
+                                    consumers.remove(consumerId, 
consumerFuture);
+                                }
                                 ctx.writeAndFlush(Commands.newError(requestId, 
error,
                                         "Consumer is already present on the 
connection"));
                                 return null;
@@ -892,6 +897,7 @@ public class ServerCnx extends PulsarHandler {
                                 String msg = String.format("Encryption is 
required in %s", topicName);
                                 log.warn("[{}] {}", remoteAddress, msg);
                                 ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.MetadataError, msg));
+                                producers.remove(producerId, producerFuture);
                                 return;
                             }
 

Reply via email to