This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 60200b44e2 ARTEMIS-4506 restore MQTT sub queue name syntax
60200b44e2 is described below

commit 60200b44e2d66adecebc2988091ba16bb5aed5a8
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Nov 15 17:04:38 2023 -0600

    ARTEMIS-4506 restore MQTT sub queue name syntax
    
    Starting with 2.28.0, the broker doesn't translate the character `/` to
    the configured wildcard delimiter (i.e. `.` by default) when creating
    subscription queues for MQTT clients.
    
    This commit fixes that regression and restores the proper translation.
---
 .../core/protocol/mqtt/MQTTPublishManager.java     |  2 +-
 .../protocol/mqtt/MQTTRetainMessageManager.java    |  4 +--
 .../core/protocol/mqtt/MQTTSessionState.java       |  2 +-
 .../protocol/mqtt/MQTTSubscriptionManager.java     | 38 ++++++++++++----------
 .../artemis/core/protocol/mqtt/MQTTUtil.java       | 34 +++++++++++++++++--
 .../artemis/core/protocol/mqtt/MQTTUtilTest.java   | 37 +++++++++++++++++++++
 .../artemis/tests/integration/mqtt/MQTTTest.java   |  6 ++--
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 13 ++++++++
 .../mqtt5/spec/controlpackets/PublishTests.java    | 18 +++++-----
 9 files changed, 117 insertions(+), 37 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index eb31cccfb8..5c79a53b43 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -216,7 +216,7 @@ public class MQTTPublishManager {
                }
             }
          }
-         String coreAddress = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(topic, 
session.getWildcardConfiguration());
+         String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(topic, 
session.getWildcardConfiguration());
          SimpleString address = SimpleString.toSimpleString(coreAddress, 
session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
          Message serverMessage = 
MQTTUtil.createServerMessageFromByteBuf(session, address, message);
          int qos = message.fixedHeader().qosLevel().value();
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 1a21aa2c9f..26be4bc53f 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -48,7 +48,7 @@ public class MQTTRetainMessageManager {
     * the retained queue and the previous retain message consumed to remove it 
from the queue.
     */
    void handleRetainedMessage(Message messageParameter, String address, 
boolean reset, Transaction tx) throws Exception {
-      SimpleString retainAddress = new 
SimpleString(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 address, session.getWildcardConfiguration()));
+      SimpleString retainAddress = new 
SimpleString(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 address, session.getWildcardConfiguration()));
 
       Queue queue = session.getServer().locateQueue(retainAddress);
       if (queue == null) {
@@ -65,7 +65,7 @@ public class MQTTRetainMessageManager {
 
    void addRetainedMessagesToQueue(Queue queue, String address) throws 
Exception {
       // The address filter that matches all retained message queues.
-      String retainAddress = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 address, session.getWildcardConfiguration());
+      String retainAddress = 
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, 
address, session.getWildcardConfiguration());
       BindingQueryResult bindingQueryResult = 
session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
 
       // Iterate over all matching retain queues and add the queue
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 5c63f1c152..3cc08dcf63 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -204,7 +204,7 @@ public class MQTTSessionState {
    public boolean addSubscription(MqttTopicSubscription subscription, 
WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) 
throws Exception {
       // synchronized to prevent race with removeSubscription
       synchronized (subscriptions) {
-         
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCoreAddress(subscription.topicName(),
 wildcardConfiguration), new ConcurrentHashMap<>());
+         
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCore(subscription.topicName(),
 wildcardConfiguration).toString(), new ConcurrentHashMap<>());
 
          Pair<MqttTopicSubscription, Integer> existingSubscription = 
subscriptions.get(subscription.topicName());
          if (existingSubscription != null) {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index dedacd55e9..5ca6679dc0 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -104,11 +105,12 @@ public class MQTTSubscriptionManager {
 
    private void addSubscription(MqttTopicSubscription subscription, Integer 
subscriptionIdentifier, boolean initialStart) throws Exception {
       String rawTopicName = 
CompositeAddress.extractAddressName(subscription.topicName());
-      String parsedTopicName = parseTopicName(rawTopicName);
+      String parsedTopicName = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
       int qos = subscription.qualityOfService().value();
-      String coreAddress = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(parsedTopicName, 
session.getWildcardConfiguration());
+      String coreAddress = 
MQTTUtil.convertMqttTopicFilterToCore(parsedTopicName, 
session.getWildcardConfiguration());
+      String coreQueue = getQueueNameForTopic(rawTopicName).toString();
 
-      Queue q = createQueueForSubscription(coreAddress, 
getQueueNameForTopic(rawTopicName));
+      Queue q = createQueueForSubscription(coreAddress, coreQueue);
 
       try {
          if (initialStart) {
@@ -157,7 +159,7 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   private Queue createQueueForSubscription(String address, SimpleString 
queueName) throws Exception {
+   private Queue createQueueForSubscription(String address, String queueName) 
throws Exception {
       // check to see if a subscription queue already exists.
       Queue q = session.getServer().locateQueue(queueName);
 
@@ -175,9 +177,9 @@ public class MQTTSubscriptionManager {
          AddressInfo addressInfo = 
session.getServerSession().getAddress(sAddress);
          if (addressInfo == null) {
             if (!bindingQueryResult.isAutoCreateAddresses()) {
-               throw 
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
+               throw 
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(sAddress);
             }
-            addressInfo = 
session.getServerSession().createAddress(SimpleString.toSimpleString(address),
+            addressInfo = session.getServerSession().createAddress(sAddress,
                                                                    
RoutingType.MULTICAST, true);
          }
          return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
@@ -185,7 +187,7 @@ public class MQTTSubscriptionManager {
       return q;
    }
 
-   private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, 
AddressInfo addressInfo, SimpleString queue) throws Exception {
+   private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, 
AddressInfo addressInfo, String queue) throws Exception {
       /*
        * MQTT 3.1 and 3.1.1 clients using a clean session should have a 
*non-durable* subscription queue. If the broker
        * restarts the queue should be removed. This is due to [MQTT-3.1.2-6] 
which states that the session (and any
@@ -237,7 +239,7 @@ public class MQTTSubscriptionManager {
       // for noLocal support we use the MQTT *client id* rather than the 
connection ID, but we still use the existing property name
       ServerConsumer consumer = session.getServerSession().createConsumer(cid, 
queue.getName(), noLocal ? 
SimpleString.toSimpleString(CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" + 
session.getState().getClientId() + "'") : null, false, false, -1);
 
-      ServerConsumer existingConsumer = consumers.put(parseTopicName(topic), 
consumer);
+      ServerConsumer existingConsumer = 
consumers.put(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic).getB(), 
consumer);
       if (existingConsumer != null) {
          existingConsumer.setStarted(false);
          existingConsumer.close(false);
@@ -264,18 +266,18 @@ public class MQTTSubscriptionManager {
 
             try {
                session.getState().removeSubscription(topics.get(i));
-               ServerConsumer removed = 
consumers.remove(parseTopicName(topics.get(i)));
+               ServerConsumer removed = 
consumers.remove(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topics.get(i)).getB());
                if (removed != null) {
                   removed.close(false);
                   consumerQoSLevels.remove(removed.getID());
                }
 
-               SimpleString internalQueueName = 
getQueueNameForTopic(topics.get(i));
+               SimpleString internalQueueName = 
SimpleString.toSimpleString(getQueueNameForTopic(topics.get(i)));
                Queue queue = 
session.getServer().locateQueue(internalQueueName);
                if (queue != null) {
                   if (queue.isConfigurationManaged()) {
                      queue.deleteAllReferences();
-                  } else if 
(!topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) || 
(topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) && 
queue.getConsumerCount() == 0)) {
+                  } else if (!MQTTUtil.isSharedSubscription(topics.get(i)) || 
(MQTTUtil.isSharedSubscription(topics.get(i)) && queue.getConsumerCount() == 
0)) {
                      session.getServerSession().deleteQueue(internalQueueName, 
enforceSecurity);
                   }
                }
@@ -294,15 +296,15 @@ public class MQTTSubscriptionManager {
       return reasonCodes;
    }
 
-   private SimpleString getQueueNameForTopic(String topic) {
-      if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
-         int slashIndex = topic.indexOf(SLASH) + 1;
-         String sharedSubscriptionName = topic.substring(slashIndex, 
topic.indexOf(SLASH, slashIndex));
-         String parsedTopicName = topic.substring(topic.indexOf(SLASH, 
slashIndex) + 1);
-         return new 
SimpleString(sharedSubscriptionName).concat(".").concat(parsedTopicName);
+   private String getQueueNameForTopic(String topic) {
+      String queueName;
+      if (MQTTUtil.isSharedSubscription(topic)) {
+         Pair<String, String> decomposed = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic);
+         queueName = decomposed.getA().concat(".").concat(decomposed.getB());
       } else {
-         return new 
SimpleString(session.getState().getClientId()).concat(".").concat(topic);
+         queueName = 
session.getState().getClientId().concat(".").concat(topic);
       }
+      return MQTTUtil.convertMqttTopicFilterToCore(queueName, 
session.getWildcardConfiguration());
    }
 
    /**
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 4b59ea2489..976e958005 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -44,6 +44,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -137,11 +138,11 @@ public class MQTTUtil {
 
    public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
 
-   public static String convertMqttTopicFilterToCoreAddress(String filter, 
WildcardConfiguration wildcardConfiguration) {
-      return convertMqttTopicFilterToCoreAddress(null, filter, 
wildcardConfiguration);
+   public static String convertMqttTopicFilterToCore(String filter, 
WildcardConfiguration wildcardConfiguration) {
+      return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
    }
 
-   public static String convertMqttTopicFilterToCoreAddress(String 
prefixToAdd, String filter, WildcardConfiguration wildcardConfiguration) {
+   public static String convertMqttTopicFilterToCore(String prefixToAdd, 
String filter, WildcardConfiguration wildcardConfiguration) {
       if (filter == null) {
          return "";
       }
@@ -528,4 +529,31 @@ public class MQTTUtil {
 
       return defaultReturnValue == null ? null : defaultReturnValue;
    }
+
+
+
+   /*
+    * MQTT shared subscriptions are specified with the syntax from
+    * 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250:
+    *   $share/<shareName>/<topicFilter>
+    * This method takes this syntax and returns the shareName and the 
topicFilter.
+    */
+   public static Pair<String, String> 
decomposeSharedSubscriptionTopicFilter(String topicFilter) {
+      if (isSharedSubscription(topicFilter)) {
+         int prefix = SHARED_SUBSCRIPTION_PREFIX.length();
+         String shareName = topicFilter.substring(prefix, 
topicFilter.indexOf(SLASH, prefix));
+         String parsedTopicName = 
topicFilter.substring(topicFilter.indexOf(SLASH, prefix) + 1);
+         return new Pair(shareName, parsedTopicName);
+      } else {
+         return new Pair(null, topicFilter);
+      }
+   }
+
+   public static boolean isSharedSubscription(String topicFilter) {
+      if (topicFilter.startsWith(SHARED_SUBSCRIPTION_PREFIX)) {
+         return true;
+      } else {
+         return false;
+      }
+   }
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
new file mode 100644
index 0000000000..910b24851e
--- /dev/null
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MQTTUtilTest {
+
+   @Test
+   public void testDecompose() {
+      String shareName = RandomUtil.randomString();
+      String topicFilter = RandomUtil.randomString();
+
+      Pair<String, String> decomposed = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX
 + shareName + MQTTUtil.SLASH + topicFilter);
+      assertEquals(shareName, decomposed.getA());
+      assertEquals(topicFilter, decomposed.getB());
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index 1a5e673a01..91ba3d732d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1903,7 +1903,7 @@ public class MQTTTest extends MQTTTestSupport {
       Exception peerDisconnectedException = null;
       try {
          String clientId = "test.client";
-         SimpleString coreAddress = new SimpleString("foo/bar");
+         String coreAddress = MQTTUtil.convertMqttTopicFilterToCore("foo/bar", 
server.getConfiguration().getWildcardConfiguration());
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)};
 
          getServer().createQueue(new QueueConfiguration(new 
SimpleString(clientId + "." + 
coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
@@ -2151,11 +2151,11 @@ public class MQTTTest extends MQTTTestSupport {
    @Test(timeout = 60 * 1000)
    public void testAutoDeleteRetainedQueue() throws Exception {
       final String TOPIC = "/abc/123";
-      final String RETAINED_QUEUE = 
MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, server.getConfiguration().getWildcardConfiguration());
+      final String RETAINED_QUEUE = 
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, 
TOPIC, server.getConfiguration().getWildcardConfiguration());
       final MQTTClientProvider publisher = getMQTTClientProvider();
       final MQTTClientProvider subscriber = getMQTTClientProvider();
 
-      
server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCoreAddress("#",
 server.getConfiguration().getWildcardConfiguration()), new 
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
+      
server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCore("#",
 server.getConfiguration().getWildcardConfiguration()), new 
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
 
       initializeConnection(publisher);
       initializeConnection(subscriber);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index e806d2a9e4..b55e61e047 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -611,4 +611,17 @@ public class MQTT5Test extends MQTT5TestSupport {
       }
       client.close();
    }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testSubscriptionQueueName() throws Exception {
+      final String topic = "a/b";
+      final String clientID = "myClientID";
+
+      MqttClient client = createPahoClient(clientID);
+      client.connect();
+      client.subscribe(topic, 1);
+      Wait.assertTrue(() -> 
server.locateQueue(SimpleString.toSimpleString(clientID.concat(".").concat(topic.replace('/',
 '.')))) != null, 2000, 100);
+      client.disconnect();
+      client.close();
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index de0b6a7abf..64629b5b8f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -269,7 +269,7 @@ public class PublishTests extends MQTT5TestSupport {
       // send first retained message
       producer.publish(TOPIC, "retain1".getBytes(), 2, true);
 
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
 
       // send second retained message; should *remove* the first
       producer.publish(TOPIC, new byte[0], 2, true);
@@ -277,7 +277,7 @@ public class PublishTests extends MQTT5TestSupport {
       producer.disconnect();
       producer.close();
 
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0, 2000, 100);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0, 2000, 100);
 
       final CountDownLatch latch = new CountDownLatch(1);
       MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -311,7 +311,7 @@ public class PublishTests extends MQTT5TestSupport {
 
       // send retained message
       producer.publish(TOPIC, RETAINED_PAYLOAD.getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 1000, 100);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 1000, 100);
 
       // send an unretained message; should *not* remove the existing retained 
message
       producer.publish(TOPIC, UNRETAINED_PAYLOAD.getBytes(), 2, false);
@@ -319,7 +319,7 @@ public class PublishTests extends MQTT5TestSupport {
       producer.disconnect();
       producer.close();
 
-      Wait.assertFalse(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1, 1000, 100);
+      Wait.assertFalse(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1, 1000, 100);
 
       final CountDownLatch latch = new CountDownLatch(1);
       MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -405,7 +405,7 @@ public class PublishTests extends MQTT5TestSupport {
       for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
          final String topicName = topicNames[i];
          producer.publish(topicName, retainedPayloads[i].getBytes(), 2, true);
-         Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 topicName, 
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 
2000, 100);
+         Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 topicName, 
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 
2000, 100);
       }
       producer.disconnect();
       producer.close();
@@ -467,7 +467,7 @@ public class PublishTests extends MQTT5TestSupport {
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
 
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, 
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 
2000, 100);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, 
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 
2000, 100);
       producer.disconnect();
       producer.close();
 
@@ -530,7 +530,7 @@ public class PublishTests extends MQTT5TestSupport {
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
       producer.disconnect();
       producer.close();
 
@@ -586,7 +586,7 @@ public class PublishTests extends MQTT5TestSupport {
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
       producer.disconnect();
       producer.close();
 
@@ -635,7 +635,7 @@ public class PublishTests extends MQTT5TestSupport {
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
       producer.disconnect();
       producer.close();
 

Reply via email to