This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 60200b44e2 ARTEMIS-4506 restore MQTT sub queue name syntax
60200b44e2 is described below
commit 60200b44e2d66adecebc2988091ba16bb5aed5a8
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Nov 15 17:04:38 2023 -0600
ARTEMIS-4506 restore MQTT sub queue name syntax
Starting with 2.28.0, the broker doesn't translate the character `/` to
the configured wildcard delimiter (i.e. `.` by default) when creating
subscription queues for MQTT clients.
This commit fixes that regression and restores the proper translation.
---
.../core/protocol/mqtt/MQTTPublishManager.java | 2 +-
.../protocol/mqtt/MQTTRetainMessageManager.java | 4 +--
.../core/protocol/mqtt/MQTTSessionState.java | 2 +-
.../protocol/mqtt/MQTTSubscriptionManager.java | 38 ++++++++++++----------
.../artemis/core/protocol/mqtt/MQTTUtil.java | 34 +++++++++++++++++--
.../artemis/core/protocol/mqtt/MQTTUtilTest.java | 37 +++++++++++++++++++++
.../artemis/tests/integration/mqtt/MQTTTest.java | 6 ++--
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 13 ++++++++
.../mqtt5/spec/controlpackets/PublishTests.java | 18 +++++-----
9 files changed, 117 insertions(+), 37 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index eb31cccfb8..5c79a53b43 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -216,7 +216,7 @@ public class MQTTPublishManager {
}
}
}
- String coreAddress =
MQTTUtil.convertMqttTopicFilterToCoreAddress(topic,
session.getWildcardConfiguration());
+ String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(topic,
session.getWildcardConfiguration());
SimpleString address = SimpleString.toSimpleString(coreAddress,
session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
Message serverMessage =
MQTTUtil.createServerMessageFromByteBuf(session, address, message);
int qos = message.fixedHeader().qosLevel().value();
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 1a21aa2c9f..26be4bc53f 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -48,7 +48,7 @@ public class MQTTRetainMessageManager {
* the retained queue and the previous retain message consumed to remove it
from the queue.
*/
void handleRetainedMessage(Message messageParameter, String address,
boolean reset, Transaction tx) throws Exception {
- SimpleString retainAddress = new
SimpleString(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
address, session.getWildcardConfiguration()));
+ SimpleString retainAddress = new
SimpleString(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
address, session.getWildcardConfiguration()));
Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) {
@@ -65,7 +65,7 @@ public class MQTTRetainMessageManager {
void addRetainedMessagesToQueue(Queue queue, String address) throws
Exception {
// The address filter that matches all retained message queues.
- String retainAddress =
MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
address, session.getWildcardConfiguration());
+ String retainAddress =
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
address, session.getWildcardConfiguration());
BindingQueryResult bindingQueryResult =
session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
// Iterate over all matching retain queues and add the queue
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 5c63f1c152..3cc08dcf63 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -204,7 +204,7 @@ public class MQTTSessionState {
public boolean addSubscription(MqttTopicSubscription subscription,
WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier)
throws Exception {
// synchronized to prevent race with removeSubscription
synchronized (subscriptions) {
-
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCoreAddress(subscription.topicName(),
wildcardConfiguration), new ConcurrentHashMap<>());
+
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCore(subscription.topicName(),
wildcardConfiguration).toString(), new ConcurrentHashMap<>());
Pair<MqttTopicSubscription, Integer> existingSubscription =
subscriptions.get(subscription.topicName());
if (existingSubscription != null) {
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index dedacd55e9..5ca6679dc0 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -104,11 +105,12 @@ public class MQTTSubscriptionManager {
private void addSubscription(MqttTopicSubscription subscription, Integer
subscriptionIdentifier, boolean initialStart) throws Exception {
String rawTopicName =
CompositeAddress.extractAddressName(subscription.topicName());
- String parsedTopicName = parseTopicName(rawTopicName);
+ String parsedTopicName =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
int qos = subscription.qualityOfService().value();
- String coreAddress =
MQTTUtil.convertMqttTopicFilterToCoreAddress(parsedTopicName,
session.getWildcardConfiguration());
+ String coreAddress =
MQTTUtil.convertMqttTopicFilterToCore(parsedTopicName,
session.getWildcardConfiguration());
+ String coreQueue = getQueueNameForTopic(rawTopicName).toString();
- Queue q = createQueueForSubscription(coreAddress,
getQueueNameForTopic(rawTopicName));
+ Queue q = createQueueForSubscription(coreAddress, coreQueue);
try {
if (initialStart) {
@@ -157,7 +159,7 @@ public class MQTTSubscriptionManager {
}
}
- private Queue createQueueForSubscription(String address, SimpleString
queueName) throws Exception {
+ private Queue createQueueForSubscription(String address, String queueName)
throws Exception {
// check to see if a subscription queue already exists.
Queue q = session.getServer().locateQueue(queueName);
@@ -175,9 +177,9 @@ public class MQTTSubscriptionManager {
AddressInfo addressInfo =
session.getServerSession().getAddress(sAddress);
if (addressInfo == null) {
if (!bindingQueryResult.isAutoCreateAddresses()) {
- throw
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
+ throw
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(sAddress);
}
- addressInfo =
session.getServerSession().createAddress(SimpleString.toSimpleString(address),
+ addressInfo = session.getServerSession().createAddress(sAddress,
RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
@@ -185,7 +187,7 @@ public class MQTTSubscriptionManager {
return q;
}
- private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult,
AddressInfo addressInfo, SimpleString queue) throws Exception {
+ private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult,
AddressInfo addressInfo, String queue) throws Exception {
/*
* MQTT 3.1 and 3.1.1 clients using a clean session should have a
*non-durable* subscription queue. If the broker
* restarts the queue should be removed. This is due to [MQTT-3.1.2-6]
which states that the session (and any
@@ -237,7 +239,7 @@ public class MQTTSubscriptionManager {
// for noLocal support we use the MQTT *client id* rather than the
connection ID, but we still use the existing property name
ServerConsumer consumer = session.getServerSession().createConsumer(cid,
queue.getName(), noLocal ?
SimpleString.toSimpleString(CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" +
session.getState().getClientId() + "'") : null, false, false, -1);
- ServerConsumer existingConsumer = consumers.put(parseTopicName(topic),
consumer);
+ ServerConsumer existingConsumer =
consumers.put(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic).getB(),
consumer);
if (existingConsumer != null) {
existingConsumer.setStarted(false);
existingConsumer.close(false);
@@ -264,18 +266,18 @@ public class MQTTSubscriptionManager {
try {
session.getState().removeSubscription(topics.get(i));
- ServerConsumer removed =
consumers.remove(parseTopicName(topics.get(i)));
+ ServerConsumer removed =
consumers.remove(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topics.get(i)).getB());
if (removed != null) {
removed.close(false);
consumerQoSLevels.remove(removed.getID());
}
- SimpleString internalQueueName =
getQueueNameForTopic(topics.get(i));
+ SimpleString internalQueueName =
SimpleString.toSimpleString(getQueueNameForTopic(topics.get(i)));
Queue queue =
session.getServer().locateQueue(internalQueueName);
if (queue != null) {
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
- } else if
(!topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) ||
(topics.get(i).startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX) &&
queue.getConsumerCount() == 0)) {
+ } else if (!MQTTUtil.isSharedSubscription(topics.get(i)) ||
(MQTTUtil.isSharedSubscription(topics.get(i)) && queue.getConsumerCount() ==
0)) {
session.getServerSession().deleteQueue(internalQueueName,
enforceSecurity);
}
}
@@ -294,15 +296,15 @@ public class MQTTSubscriptionManager {
return reasonCodes;
}
- private SimpleString getQueueNameForTopic(String topic) {
- if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
- int slashIndex = topic.indexOf(SLASH) + 1;
- String sharedSubscriptionName = topic.substring(slashIndex,
topic.indexOf(SLASH, slashIndex));
- String parsedTopicName = topic.substring(topic.indexOf(SLASH,
slashIndex) + 1);
- return new
SimpleString(sharedSubscriptionName).concat(".").concat(parsedTopicName);
+ private String getQueueNameForTopic(String topic) {
+ String queueName;
+ if (MQTTUtil.isSharedSubscription(topic)) {
+ Pair<String, String> decomposed =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic);
+ queueName = decomposed.getA().concat(".").concat(decomposed.getB());
} else {
- return new
SimpleString(session.getState().getClientId()).concat(".").concat(topic);
+ queueName =
session.getState().getClientId().concat(".").concat(topic);
}
+ return MQTTUtil.convertMqttTopicFilterToCore(queueName,
session.getWildcardConfiguration());
}
/**
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 4b59ea2489..976e958005 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -44,6 +44,7 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -137,11 +138,11 @@ public class MQTTUtil {
public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
- public static String convertMqttTopicFilterToCoreAddress(String filter,
WildcardConfiguration wildcardConfiguration) {
- return convertMqttTopicFilterToCoreAddress(null, filter,
wildcardConfiguration);
+ public static String convertMqttTopicFilterToCore(String filter,
WildcardConfiguration wildcardConfiguration) {
+ return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
}
- public static String convertMqttTopicFilterToCoreAddress(String
prefixToAdd, String filter, WildcardConfiguration wildcardConfiguration) {
+ public static String convertMqttTopicFilterToCore(String prefixToAdd,
String filter, WildcardConfiguration wildcardConfiguration) {
if (filter == null) {
return "";
}
@@ -528,4 +529,31 @@ public class MQTTUtil {
return defaultReturnValue == null ? null : defaultReturnValue;
}
+
+
+
+ /*
+ * MQTT shared subscriptions are specified with the syntax from
+ *
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250:
+ * $share/<shareName>/<topicFilter>
+ * This method takes this syntax and returns the shareName and the
topicFilter.
+ */
+ public static Pair<String, String>
decomposeSharedSubscriptionTopicFilter(String topicFilter) {
+ if (isSharedSubscription(topicFilter)) {
+ int prefix = SHARED_SUBSCRIPTION_PREFIX.length();
+ String shareName = topicFilter.substring(prefix,
topicFilter.indexOf(SLASH, prefix));
+ String parsedTopicName =
topicFilter.substring(topicFilter.indexOf(SLASH, prefix) + 1);
+ return new Pair(shareName, parsedTopicName);
+ } else {
+ return new Pair(null, topicFilter);
+ }
+ }
+
+ public static boolean isSharedSubscription(String topicFilter) {
+ if (topicFilter.startsWith(SHARED_SUBSCRIPTION_PREFIX)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
new file mode 100644
index 0000000000..910b24851e
--- /dev/null
+++
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MQTTUtilTest {
+
+ @Test
+ public void testDecompose() {
+ String shareName = RandomUtil.randomString();
+ String topicFilter = RandomUtil.randomString();
+
+ Pair<String, String> decomposed =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX
+ shareName + MQTTUtil.SLASH + topicFilter);
+ assertEquals(shareName, decomposed.getA());
+ assertEquals(topicFilter, decomposed.getB());
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index 1a5e673a01..91ba3d732d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1903,7 +1903,7 @@ public class MQTTTest extends MQTTTestSupport {
Exception peerDisconnectedException = null;
try {
String clientId = "test.client";
- SimpleString coreAddress = new SimpleString("foo/bar");
+ String coreAddress = MQTTUtil.convertMqttTopicFilterToCore("foo/bar",
server.getConfiguration().getWildcardConfiguration());
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar",
QoS.AT_LEAST_ONCE)};
getServer().createQueue(new QueueConfiguration(new
SimpleString(clientId + "." +
coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
@@ -2151,11 +2151,11 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)
public void testAutoDeleteRetainedQueue() throws Exception {
final String TOPIC = "/abc/123";
- final String RETAINED_QUEUE =
MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, server.getConfiguration().getWildcardConfiguration());
+ final String RETAINED_QUEUE =
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, server.getConfiguration().getWildcardConfiguration());
final MQTTClientProvider publisher = getMQTTClientProvider();
final MQTTClientProvider subscriber = getMQTTClientProvider();
-
server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCoreAddress("#",
server.getConfiguration().getWildcardConfiguration()), new
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
+
server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCore("#",
server.getConfiguration().getWildcardConfiguration()), new
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
initializeConnection(publisher);
initializeConnection(subscriber);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index e806d2a9e4..b55e61e047 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -611,4 +611,17 @@ public class MQTT5Test extends MQTT5TestSupport {
}
client.close();
}
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testSubscriptionQueueName() throws Exception {
+ final String topic = "a/b";
+ final String clientID = "myClientID";
+
+ MqttClient client = createPahoClient(clientID);
+ client.connect();
+ client.subscribe(topic, 1);
+ Wait.assertTrue(() ->
server.locateQueue(SimpleString.toSimpleString(clientID.concat(".").concat(topic.replace('/',
'.')))) != null, 2000, 100);
+ client.disconnect();
+ client.close();
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index de0b6a7abf..64629b5b8f 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -269,7 +269,7 @@ public class PublishTests extends MQTT5TestSupport {
// send first retained message
producer.publish(TOPIC, "retain1".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
// send second retained message; should *remove* the first
producer.publish(TOPIC, new byte[0], 2, true);
@@ -277,7 +277,7 @@ public class PublishTests extends MQTT5TestSupport {
producer.disconnect();
producer.close();
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0, 2000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0, 2000, 100);
final CountDownLatch latch = new CountDownLatch(1);
MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -311,7 +311,7 @@ public class PublishTests extends MQTT5TestSupport {
// send retained message
producer.publish(TOPIC, RETAINED_PAYLOAD.getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 1000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 1000, 100);
// send an unretained message; should *not* remove the existing retained
message
producer.publish(TOPIC, UNRETAINED_PAYLOAD.getBytes(), 2, false);
@@ -319,7 +319,7 @@ public class PublishTests extends MQTT5TestSupport {
producer.disconnect();
producer.close();
- Wait.assertFalse(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1, 1000, 100);
+ Wait.assertFalse(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1, 1000, 100);
final CountDownLatch latch = new CountDownLatch(1);
MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -405,7 +405,7 @@ public class PublishTests extends MQTT5TestSupport {
for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
final String topicName = topicNames[i];
producer.publish(topicName, retainedPayloads[i].getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
topicName,
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1,
2000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
topicName,
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1,
2000, 100);
}
producer.disconnect();
producer.close();
@@ -467,7 +467,7 @@ public class PublishTests extends MQTT5TestSupport {
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC,
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1,
2000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC,
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1,
2000, 100);
producer.disconnect();
producer.close();
@@ -530,7 +530,7 @@ public class PublishTests extends MQTT5TestSupport {
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
producer.disconnect();
producer.close();
@@ -586,7 +586,7 @@ public class PublishTests extends MQTT5TestSupport {
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
producer.disconnect();
producer.close();
@@ -635,7 +635,7 @@ public class PublishTests extends MQTT5TestSupport {
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
producer.disconnect();
producer.close();