Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2466#discussion_r242541448
--- Diff:
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
---
@@ -194,12 +191,13 @@ private synchronized void removeSubscription(String
address) throws Exception {
SimpleString internalQueueName =
getQueueNameForTopic(internalAddress);
session.getSessionState().removeSubscription(address);
-
- ServerConsumer consumer = consumers.get(address);
+ Set<Consumer> queueConsumers =
session.getServer().queueConsumersQuery(internalQueueName);
--- End diff --
I think this is wrong. You should only check for consumers on this Session.
Your logic is closing every consumer to the same address. You keep multiple
connections and you have a failure.
---