This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 479f067 [pulsar-broker] clean up producer/consumer result from
connection-cache (#4145)
479f067 is described below
commit 479f067b9dc58c828560feec0f0061b50a20ef1e
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Apr 26 15:56:36 2019 -0700
[pulsar-broker] clean up producer/consumer result from connection-cache
(#4145)
---
.../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aebcb30..d2a5268 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -660,8 +660,13 @@ public class ServerCnx extends PulsarHandler {
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id {} is
already present on the connection", remoteAddress,
topicName, subscriptionName,
consumerId);
- ServerError error =
!existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
- : getErrorCode(existingConsumerFuture);
+ ServerError error = null;
+ if(!existingConsumerFuture.isDone()) {
+ error = ServerError.ServiceNotReady;
+ }else {
+ error =
getErrorCode(existingConsumerFuture);
+ consumers.remove(consumerId,
consumerFuture);
+ }
ctx.writeAndFlush(Commands.newError(requestId,
error,
"Consumer is already present on the
connection"));
return null;
@@ -892,6 +897,7 @@ public class ServerCnx extends PulsarHandler {
String msg = String.format("Encryption is
required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
ctx.writeAndFlush(Commands.newError(requestId,
ServerError.MetadataError, msg));
+ producers.remove(producerId, producerFuture);
return;
}