This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 513b7826a4 ARTEMIS-4532 MQTT-to-core wildcard conversion is broken
513b7826a4 is described below
commit 513b7826a440eecbfa0f3d7000360186e35bc43d
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Dec 13 10:36:11 2023 -0600
ARTEMIS-4532 MQTT-to-core wildcard conversion is broken
Currently when an MQTT topic filter contains characters from the
configured wildcard syntax the conversion to/from this syntax breaks.
For example, when using the default wildcard syntax if an MQTT topic
filter contains a . the conversion from the MQTT wildcard syntax to the
core wildcard syntax and back will result in the `.` being replaced with
a `/.`.
This commit fixes that plus a few other things...
- Implements proper conversions to/from one WildcardConfiguration to
another.
- Refactors the MQTT code which invokes these conversion methods. This
includes simplifying a lot of test code.
- Adds lots of tests for everything.
- Clarifies some variable naming to better distinguish between core and
MQTT.
---
.../core/protocol/mqtt/MQTTPublishManager.java | 16 +--
.../protocol/mqtt/MQTTRetainMessageManager.java | 4 +-
.../core/protocol/mqtt/MQTTSessionState.java | 4 +-
.../protocol/mqtt/MQTTSubscriptionManager.java | 51 +++-----
.../artemis/core/protocol/mqtt/MQTTUtil.java | 110 +++++++++++------
.../artemis/core/protocol/mqtt/MQTTUtilTest.java | 110 ++++++++++++++++-
.../core/protocol/openwire/util/OpenWireUtil.java | 10 +-
.../artemis/core/config/WildcardConfiguration.java | 130 ++++++++++++++++++---
.../core/config/WildcardConfigurationTest.java | 124 ++++++++++++++++++++
docs/user-manual/mqtt.adoc | 20 +++-
docs/user-manual/versions.adoc | 22 ++++
.../artemis/tests/integration/mqtt/MQTTTest.java | 8 +-
.../tests/integration/mqtt/MQTTTestSupport.java | 3 +-
.../mqtt/MqttWildCardSubAutoCreateTest.java | 6 +-
.../integration/mqtt/PahoMQTTQOS2SecurityTest.java | 3 +-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 40 +++++--
.../tests/integration/mqtt5/MQTT5TestSupport.java | 41 ++-----
.../mqtt5/spec/ControlPacketFormatTests.java | 25 ++--
.../mqtt5/spec/MessageReceiptTests.java | 5 +-
.../tests/integration/mqtt5/spec/QoSTests.java | 60 +++++-----
.../mqtt5/spec/controlpackets/ConnectTests.java | 4 +-
.../mqtt5/spec/controlpackets/PublishTests.java | 64 ++++++----
.../controlpackets/PublishTestsWithSecurity.java | 6 +-
.../controlpackets/SubscribeTestsWithSecurity.java | 6 +-
.../ssl/CertificateAuthenticationSslTests.java | 9 +-
25 files changed, 641 insertions(+), 240 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 5c79a53b43..eec146287d 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -216,7 +216,7 @@ public class MQTTPublishManager {
}
}
}
- String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(topic,
session.getWildcardConfiguration());
+ String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(topic,
session.getWildcardConfiguration());
SimpleString address = SimpleString.toSimpleString(coreAddress,
session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
Message serverMessage =
MQTTUtil.createServerMessageFromByteBuf(session, address, message);
int qos = message.fixedHeader().qosLevel().value();
@@ -392,7 +392,7 @@ public class MQTTPublishManager {
}
private boolean publishToClient(int messageId, ICoreMessage message, int
deliveryCount, int qos, long consumerId) throws Exception {
- String address =
MQTTUtil.convertCoreAddressToMqttTopicFilter(message.getAddress() == null ? ""
: message.getAddress(), session.getWildcardConfiguration());
+ String topic = MQTTUtil.getMqttTopicFromCoreAddress(message.getAddress()
== null ? "" : message.getAddress(), session.getWildcardConfiguration());
ByteBuf payload;
switch (message.getType()) {
@@ -418,29 +418,29 @@ public class MQTTPublishManager {
if (session.getVersion() == MQTTVersion.MQTT_5) {
if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY))
{
- MqttTopicSubscription sub =
session.getState().getSubscription(message.getAddress());
+ MqttTopicSubscription sub =
session.getState().getSubscription(topic);
if (sub != null && sub.option().isRetainAsPublished()) {
isRetain = true;
}
}
if (session.getState().getClientTopicAliasMaximum() != null) {
- Integer alias = session.getState().getServerTopicAlias(address);
+ Integer alias = session.getState().getServerTopicAlias(topic);
if (alias == null) {
- alias = session.getState().addServerTopicAlias(address);
+ alias = session.getState().addServerTopicAlias(topic);
if (alias != null) {
mqttProperties.add(new
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
}
} else {
mqttProperties.add(new
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
- address = "";
+ topic = "";
}
}
}
- int remainingLength = MQTTUtil.calculateRemainingLength(address,
mqttProperties, payload);
+ int remainingLength = MQTTUtil.calculateRemainingLength(topic,
mqttProperties, payload);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH,
redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength);
- MqttPublishVariableHeader varHeader = new
MqttPublishVariableHeader(address, messageId, mqttProperties);
+ MqttPublishVariableHeader varHeader = new
MqttPublishVariableHeader(topic, messageId, mqttProperties);
MqttPublishMessage publish = new MqttPublishMessage(header, varHeader,
payload);
int maxSize = session.getState().getClientMaxPacketSize();
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 26be4bc53f..b04f09f784 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -48,7 +48,7 @@ public class MQTTRetainMessageManager {
* the retained queue and the previous retain message consumed to remove it
from the queue.
*/
void handleRetainedMessage(Message messageParameter, String address,
boolean reset, Transaction tx) throws Exception {
- SimpleString retainAddress = new
SimpleString(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
address, session.getWildcardConfiguration()));
+ String retainAddress =
MQTTUtil.getCoreRetainAddressFromMqttTopic(address,
session.getWildcardConfiguration());
Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) {
@@ -65,7 +65,7 @@ public class MQTTRetainMessageManager {
void addRetainedMessagesToQueue(Queue queue, String address) throws
Exception {
// The address filter that matches all retained message queues.
- String retainAddress =
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
address, session.getWildcardConfiguration());
+ String retainAddress =
MQTTUtil.getCoreRetainAddressFromMqttTopic(address,
session.getWildcardConfiguration());
BindingQueryResult bindingQueryResult =
session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
// Iterate over all matching retain queues and add the queue
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 2570cabcd6..ad354345ec 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -199,7 +199,7 @@ public class MQTTSessionState {
public boolean addSubscription(MqttTopicSubscription subscription,
WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier)
throws Exception {
// synchronized to prevent race with removeSubscription
synchronized (subscriptions) {
-
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCore(subscription.topicName(),
wildcardConfiguration).toString(), new ConcurrentHashMap<>());
+
addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicName(),
wildcardConfiguration), new ConcurrentHashMap<>());
Pair<MqttTopicSubscription, Integer> existingSubscription =
subscriptions.get(subscription.topicName());
if (existingSubscription != null) {
@@ -237,7 +237,7 @@ public class MQTTSessionState {
}
public List<Integer> getMatchingSubscriptionIdentifiers(String address) {
- address = MQTTUtil.convertCoreAddressToMqttTopicFilter(address,
session.getServer().getConfiguration().getWildcardConfiguration());
+ address = MQTTUtil.getMqttTopicFromCoreAddress(address,
session.getServer().getConfiguration().getWildcardConfiguration());
List<Integer> result = null;
for (Pair<MqttTopicSubscription, Integer> pair : subscriptions.values())
{
Pattern pattern = Match.createPattern(pair.getA().topicName(),
MQTTUtil.MQTT_WILDCARD, true);
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 5ca6679dc0..e66c880d19 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -28,7 +28,6 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.FilterConstants;
-import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -40,7 +39,6 @@ import
org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.CompositeAddress;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.DOLLAR;
-import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.SLASH;
import static
org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
public class MQTTSubscriptionManager {
@@ -106,11 +104,10 @@ public class MQTTSubscriptionManager {
private void addSubscription(MqttTopicSubscription subscription, Integer
subscriptionIdentifier, boolean initialStart) throws Exception {
String rawTopicName =
CompositeAddress.extractAddressName(subscription.topicName());
String parsedTopicName =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
- int qos = subscription.qualityOfService().value();
- String coreAddress =
MQTTUtil.convertMqttTopicFilterToCore(parsedTopicName,
session.getWildcardConfiguration());
- String coreQueue = getQueueNameForTopic(rawTopicName).toString();
- Queue q = createQueueForSubscription(coreAddress, coreQueue);
+ Queue q = createQueueForSubscription(rawTopicName, parsedTopicName);
+
+ int qos = subscription.qualityOfService().value();
try {
if (initialStart) {
@@ -140,16 +137,6 @@ public class MQTTSubscriptionManager {
}
}
- private String parseTopicName(String rawTopicName) {
- String parsedTopicName = rawTopicName;
-
- // if using a shared subscription then parse
- if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
- parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH,
rawTopicName.indexOf(SLASH) + 1) + 1);
- }
- return parsedTopicName;
- }
-
synchronized void stop() throws Exception {
for (ServerConsumer consumer : consumers.values()) {
consumer.setStarted(false);
@@ -159,13 +146,16 @@ public class MQTTSubscriptionManager {
}
}
- private Queue createQueueForSubscription(String address, String queueName)
throws Exception {
+ private Queue createQueueForSubscription(String rawTopicName, String
parsedTopicName) throws Exception {
+ String coreAddress =
MQTTUtil.getCoreAddressFromMqttTopic(parsedTopicName,
session.getWildcardConfiguration());
+ String coreQueue = MQTTUtil.getCoreQueueFromMqttTopic(rawTopicName,
session.getState().getClientId(), session.getWildcardConfiguration());
+
// check to see if a subscription queue already exists.
- Queue q = session.getServer().locateQueue(queueName);
+ Queue q = session.getServer().locateQueue(coreQueue);
// The queue does not exist so we need to create it.
if (q == null) {
- SimpleString sAddress = SimpleString.toSimpleString(address);
+ SimpleString sAddress = SimpleString.toSimpleString(coreAddress);
// Check we can auto create queues.
BindingQueryResult bindingQueryResult =
session.getServerSession().executeBindingQuery(sAddress);
@@ -182,7 +172,7 @@ public class MQTTSubscriptionManager {
addressInfo = session.getServerSession().createAddress(sAddress,
RoutingType.MULTICAST, true);
}
- return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
+ return findOrCreateQueue(bindingQueryResult, addressInfo, coreQueue);
}
return q;
}
@@ -233,13 +223,13 @@ public class MQTTSubscriptionManager {
}
}
- private void createConsumerForSubscriptionQueue(Queue queue, String topic,
int qos, boolean noLocal, Long existingConsumerId) throws Exception {
+ private void createConsumerForSubscriptionQueue(Queue queue, String
topicFilter, int qos, boolean noLocal, Long existingConsumerId) throws
Exception {
long cid = existingConsumerId != null ? existingConsumerId :
session.getServer().getStorageManager().generateID();
// for noLocal support we use the MQTT *client id* rather than the
connection ID, but we still use the existing property name
ServerConsumer consumer = session.getServerSession().createConsumer(cid,
queue.getName(), noLocal ?
SimpleString.toSimpleString(CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" +
session.getState().getClientId() + "'") : null, false, false, -1);
- ServerConsumer existingConsumer =
consumers.put(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic).getB(),
consumer);
+ ServerConsumer existingConsumer = consumers.put(topicFilter, consumer);
if (existingConsumer != null) {
existingConsumer.setStarted(false);
existingConsumer.close(false);
@@ -257,7 +247,7 @@ public class MQTTSubscriptionManager {
synchronized (state) {
reasonCodes = new short[topics.size()];
for (int i = 0; i < topics.size(); i++) {
- if (session.getState().getSubscription(topics.get(i)) == null) {
+ if (state.getSubscription(topics.get(i)) == null) {
reasonCodes[i] = MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
continue;
}
@@ -265,14 +255,14 @@ public class MQTTSubscriptionManager {
short reasonCode = MQTTReasonCodes.SUCCESS;
try {
- session.getState().removeSubscription(topics.get(i));
+ state.removeSubscription(topics.get(i));
ServerConsumer removed =
consumers.remove(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topics.get(i)).getB());
if (removed != null) {
removed.close(false);
consumerQoSLevels.remove(removed.getID());
}
- SimpleString internalQueueName =
SimpleString.toSimpleString(getQueueNameForTopic(topics.get(i)));
+ SimpleString internalQueueName =
SimpleString.toSimpleString(MQTTUtil.getCoreQueueFromMqttTopic(topics.get(i),
state.getClientId(),
session.getServer().getConfiguration().getWildcardConfiguration()));
Queue queue =
session.getServer().locateQueue(internalQueueName);
if (queue != null) {
if (queue.isConfigurationManaged()) {
@@ -296,17 +286,6 @@ public class MQTTSubscriptionManager {
return reasonCodes;
}
- private String getQueueNameForTopic(String topic) {
- String queueName;
- if (MQTTUtil.isSharedSubscription(topic)) {
- Pair<String, String> decomposed =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic);
- queueName = decomposed.getA().concat(".").concat(decomposed.getB());
- } else {
- queueName =
session.getState().getClientId().concat(".").concat(topic);
- }
- return MQTTUtil.convertMqttTopicFilterToCore(queueName,
session.getWildcardConfiguration());
- }
-
/**
* As per MQTT Spec. Subscribes this client to a number of MQTT topics.
*
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 976e958005..73bdd8d06a 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -53,6 +53,7 @@ import org.apache.commons.text.CaseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.util.Objects;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
@@ -123,7 +124,7 @@ public class MQTTUtil {
public static final int TWO_BYTE_INT_MAX = Integer.decode("0xFFFF"); //
65_535
- //
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
+ //
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
public static final int VARIABLE_BYTE_INT_MAX = 268_435_455;
public static final int MAX_PACKET_SIZE = VARIABLE_BYTE_INT_MAX;
@@ -138,26 +139,70 @@ public class MQTTUtil {
public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
- public static String convertMqttTopicFilterToCore(String filter,
WildcardConfiguration wildcardConfiguration) {
- return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
- }
+ public static final WildcardConfiguration MQTT_WILDCARD = new
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
- public static String convertMqttTopicFilterToCore(String prefixToAdd,
String filter, WildcardConfiguration wildcardConfiguration) {
- if (filter == null) {
- return "";
- }
+ /**
+ * This method takes the MQTT-related input and translates it into the
proper name for a core subscription queue. The
+ * {@code topicFilter} may be either for a shared subscription in the
format {@code $share/<shareName>/<topicFilter>}
+ * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code
a/b/c}, etc.).
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param clientId the MQTT client ID, used for normal (i.e. non-shared)
subscriptions
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core subscription queue based on the input
+ */
+ public static String getCoreQueueFromMqttTopic(String topicFilter, String
clientId, WildcardConfiguration wildcardConfiguration) {
+ Objects.requireNonNull(topicFilter, "MQTT topic filter must not be
null");
+ Objects.requireNonNull(wildcardConfiguration, "Broker wildcard
configuration must not be null");
- String converted = MQTT_WILDCARD.convert(filter, wildcardConfiguration);
- if (prefixToAdd != null) {
- converted = prefixToAdd + converted;
+ if (isSharedSubscription(topicFilter)) {
+ Pair<String, String> decomposed =
decomposeSharedSubscriptionTopicFilter(topicFilter);
+ return new
StringBuilder().append(decomposed.getA()).append(".").append(getCoreAddressFromMqttTopic(decomposed.getB(),
wildcardConfiguration)).toString();
+ } else {
+ Objects.requireNonNull(clientId, "MQTT client ID must not be null");
+ return new
StringBuilder().append(clientId).append(".").append(getCoreAddressFromMqttTopic(topicFilter,
wildcardConfiguration)).toString();
}
- return converted;
}
- public static String convertCoreAddressToMqttTopicFilter(String address,
WildcardConfiguration wildcardConfiguration) {
- if (address == null) {
- return "";
- }
+ /**
+ * This method takes the MQTT-related input and translates it into the
proper name for a core address. The
+ * {@code topicFilter} must be normal (i.e. non-shared). It should not be
in the format
+ * {@code $share/<shareName>/<topicFilter>}.
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core addres based on the input
+ */
+ public static String getCoreAddressFromMqttTopic(String topicFilter,
WildcardConfiguration wildcardConfiguration) {
+ Objects.requireNonNull(topicFilter, "MQTT topic filter must not be
null");
+ Objects.requireNonNull(wildcardConfiguration, "Broker wildcard
configuration must not be null");
+
+ return MQTT_WILDCARD.convert(topicFilter, wildcardConfiguration);
+ }
+
+ /**
+ * This is exactly the same as {@link #getCoreAddressFromMqttTopic(String,
WildcardConfiguration)} except that it
+ * also prefixes the return with
+ * {@link
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core address based on the input, stripping
+ * {@link
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
if it exists
+ */
+ public static String getCoreRetainAddressFromMqttTopic(String topicFilter,
WildcardConfiguration wildcardConfiguration) {
+ return MQTT_RETAIN_ADDRESS_PREFIX +
getCoreAddressFromMqttTopic(topicFilter, wildcardConfiguration);
+ }
+
+ /**
+ *
+ * @param address the core address
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the MQTT topic based on the input
+ */
+ public static String getMqttTopicFromCoreAddress(String address,
WildcardConfiguration wildcardConfiguration) {
+ Objects.requireNonNull(address, "Address must not be null");
+ Objects.requireNonNull(wildcardConfiguration, "Broker wildcard
configuration must not be null");
if (address.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
address = address.substring(MQTT_RETAIN_ADDRESS_PREFIX.length());
@@ -166,16 +211,6 @@ public class MQTTUtil {
return wildcardConfiguration.convert(address, MQTT_WILDCARD);
}
- public static class MQTTWildcardConfiguration extends WildcardConfiguration
{
- public MQTTWildcardConfiguration() {
- setDelimiter(SLASH);
- setSingleWord(PLUS);
- setAnyWords(HASH);
- }
- }
-
- public static final WildcardConfiguration MQTT_WILDCARD = new
MQTTWildcardConfiguration();
-
private static ICoreMessage createServerMessage(MQTTSession session,
SimpleString address, MqttPublishMessage mqttPublishMessage) {
long id = session.getServer().getStorageManager().generateID();
@@ -530,25 +565,30 @@ public class MQTTUtil {
return defaultReturnValue == null ? null : defaultReturnValue;
}
-
-
- /*
- * MQTT shared subscriptions are specified with the syntax from
- *
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250:
- * $share/<shareName>/<topicFilter>
- * This method takes this syntax and returns the shareName and the
topicFilter.
+ /**
+ * MQTT shared subscriptions are specified with
+ * <a
href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250">this
syntax</a>.
+ *
+ * @param topicFilter String in the format {@code
$share/<shareName>/<topicFilter>}
+ * @return {@code Pair<String, String>} with {@code shareName} and {@code
topicFilter} respectively or {@code null}
+ * and {@code topicFilter} if not in the shared-subscription format.
*/
public static Pair<String, String>
decomposeSharedSubscriptionTopicFilter(String topicFilter) {
if (isSharedSubscription(topicFilter)) {
int prefix = SHARED_SUBSCRIPTION_PREFIX.length();
String shareName = topicFilter.substring(prefix,
topicFilter.indexOf(SLASH, prefix));
String parsedTopicName =
topicFilter.substring(topicFilter.indexOf(SLASH, prefix) + 1);
- return new Pair(shareName, parsedTopicName);
+ return new Pair<>(shareName, parsedTopicName);
} else {
- return new Pair(null, topicFilter);
+ return new Pair<>(null, topicFilter);
}
}
+ /**
+ *
+ * @param topicFilter the topic filter
+ * @return {@code true} if the input starts with {@code $share/}, {@code
false} otherwise
+ */
public static boolean isSharedSubscription(String topicFilter) {
if (topicFilter.startsWith(SHARED_SUBSCRIPTION_PREFIX)) {
return true;
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
index 910b24851e..d24666e842 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
@@ -18,13 +18,14 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class MQTTUtilTest {
-
@Test
public void testDecompose() {
String shareName = RandomUtil.randomString();
@@ -34,4 +35,111 @@ public class MQTTUtilTest {
assertEquals(shareName, decomposed.getA());
assertEquals(topicFilter, decomposed.getB());
}
+
+ @Test
+ public void testGetCoreQueueFromMqttTopic() {
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreQueueFromMqttTopic(null, null, null));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreQueueFromMqttTopic(null, null, new WildcardConfiguration()));
+
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreQueueFromMqttTopic("", null, null));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreQueueFromMqttTopic("", null, new WildcardConfiguration()));
+
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreQueueFromMqttTopic("", "", null));
+
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreQueueFromMqttTopic(null, "", null));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreQueueFromMqttTopic(null, "", new WildcardConfiguration()));
+
+ final String clientId = RandomUtil.randomString().replace("-", "");
+
+ WildcardConfiguration defaultWildCardConfig = new
WildcardConfiguration();
+ assertEquals(clientId + ".a.b.c",
MQTTUtil.getCoreQueueFromMqttTopic("a/b/c", clientId, defaultWildCardConfig));
+ assertEquals(clientId + ".a.*.c",
MQTTUtil.getCoreQueueFromMqttTopic("a/+/c", clientId, defaultWildCardConfig));
+ assertEquals(clientId + ".a.*.#",
MQTTUtil.getCoreQueueFromMqttTopic("a/+/#", clientId, defaultWildCardConfig));
+ assertEquals(clientId + ".1\\.0.device",
MQTTUtil.getCoreQueueFromMqttTopic("1.0/device", clientId,
defaultWildCardConfig));
+ assertEquals(clientId + ".*", MQTTUtil.getCoreQueueFromMqttTopic("+",
clientId, defaultWildCardConfig));
+ assertEquals(clientId + "..", MQTTUtil.getCoreQueueFromMqttTopic("/",
clientId, defaultWildCardConfig));
+ assertEquals(clientId + ".#", MQTTUtil.getCoreQueueFromMqttTopic("#",
clientId, defaultWildCardConfig));
+
+ WildcardConfiguration customWildCardConfig = new
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+ assertEquals(clientId + ".a|b|c",
MQTTUtil.getCoreQueueFromMqttTopic("a/b/c", clientId, customWildCardConfig));
+ assertEquals(clientId + ".a|$|c",
MQTTUtil.getCoreQueueFromMqttTopic("a/+/c", clientId, customWildCardConfig));
+ assertEquals(clientId + ".a|$|!",
MQTTUtil.getCoreQueueFromMqttTopic("a/+/#", clientId, customWildCardConfig));
+ assertEquals(clientId + ".1.0|device",
MQTTUtil.getCoreQueueFromMqttTopic("1.0/device", clientId,
customWildCardConfig));
+ assertEquals(clientId + ".$", MQTTUtil.getCoreQueueFromMqttTopic("+",
clientId, customWildCardConfig));
+ assertEquals(clientId + ".|", MQTTUtil.getCoreQueueFromMqttTopic("/",
clientId, customWildCardConfig));
+ assertEquals(clientId + ".!", MQTTUtil.getCoreQueueFromMqttTopic("#",
clientId, customWildCardConfig));
+ }
+
+ @Test
+ public void testGetCoreQueueFromMqttTopicWithSharedSubscription() {
+ final String clientId = RandomUtil.randomString().replace("-", "");
+
+ WildcardConfiguration defaultWildCardConfig = new
WildcardConfiguration();
+ assertEquals("shareName.a.b.c",
MQTTUtil.getCoreQueueFromMqttTopic("$share/shareName/a/b/c", clientId,
defaultWildCardConfig));
+
+ WildcardConfiguration customWildCardConfig = new
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+ assertEquals("shareName.a|b|c",
MQTTUtil.getCoreQueueFromMqttTopic("$share/shareName/a/b/c", clientId,
customWildCardConfig));
+
+ }
+
+ @Test
+ public void testGetCoreAddressFromMqttTopic() {
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreAddressFromMqttTopic(null, null));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreAddressFromMqttTopic(null, new WildcardConfiguration()));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreAddressFromMqttTopic("", null));
+
+ WildcardConfiguration defaultWildCardConfig = new
WildcardConfiguration();
+ assertEquals("a.b.c", MQTTUtil.getCoreAddressFromMqttTopic("a/b/c",
defaultWildCardConfig));
+ assertEquals("a.*.c", MQTTUtil.getCoreAddressFromMqttTopic("a/+/c",
defaultWildCardConfig));
+ assertEquals("a.*.#", MQTTUtil.getCoreAddressFromMqttTopic("a/+/#",
defaultWildCardConfig));
+ assertEquals("1\\.0.device",
MQTTUtil.getCoreAddressFromMqttTopic("1.0/device", defaultWildCardConfig));
+ assertEquals("*", MQTTUtil.getCoreAddressFromMqttTopic("+",
defaultWildCardConfig));
+ assertEquals(".", MQTTUtil.getCoreAddressFromMqttTopic("/",
defaultWildCardConfig));
+ assertEquals("#", MQTTUtil.getCoreAddressFromMqttTopic("#",
defaultWildCardConfig));
+
+ WildcardConfiguration customWildCardConfig = new
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+ assertEquals("a|b|c", MQTTUtil.getCoreAddressFromMqttTopic("a/b/c",
customWildCardConfig));
+ assertEquals("a|$|c", MQTTUtil.getCoreAddressFromMqttTopic("a/+/c",
customWildCardConfig));
+ assertEquals("a|$|!", MQTTUtil.getCoreAddressFromMqttTopic("a/+/#",
customWildCardConfig));
+ assertEquals("1.0|device",
MQTTUtil.getCoreAddressFromMqttTopic("1.0/device", customWildCardConfig));
+ assertEquals("$", MQTTUtil.getCoreAddressFromMqttTopic("+",
customWildCardConfig));
+ assertEquals("|", MQTTUtil.getCoreAddressFromMqttTopic("/",
customWildCardConfig));
+ assertEquals("!", MQTTUtil.getCoreAddressFromMqttTopic("#",
customWildCardConfig));
+ }
+
+ @Test
+ public void testGetCoreRetainAddressFromMqttTopic() {
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreRetainAddressFromMqttTopic(null, null));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreRetainAddressFromMqttTopic(null, new WildcardConfiguration()));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getCoreRetainAddressFromMqttTopic("", null));
+
+ final String retainPrefix = "$sys.mqtt.retain.";
+ WildcardConfiguration defaultWildCardConfig = new
WildcardConfiguration();
+ assertEquals(retainPrefix + "a.b.c",
MQTTUtil.getCoreRetainAddressFromMqttTopic("a/b/c", defaultWildCardConfig));
+ }
+
+ @Test
+ public void testGetMqttTopicFromCoreAddress() {
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getMqttTopicFromCoreAddress(null, null));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getMqttTopicFromCoreAddress(null, new WildcardConfiguration()));
+ assertThrows(NullPointerException.class, () ->
MQTTUtil.getMqttTopicFromCoreAddress("", null));
+
+ WildcardConfiguration defaultWildCardConfig = new
WildcardConfiguration();
+ assertEquals("a/b/c", MQTTUtil.getMqttTopicFromCoreAddress("a.b.c",
defaultWildCardConfig));
+ assertEquals("a/+/c", MQTTUtil.getMqttTopicFromCoreAddress("a.*.c",
defaultWildCardConfig));
+ assertEquals("a/+/#", MQTTUtil.getMqttTopicFromCoreAddress("a.*.#",
defaultWildCardConfig));
+ assertEquals("1.0/device",
MQTTUtil.getMqttTopicFromCoreAddress("1\\.0.device", defaultWildCardConfig));
+ assertEquals("+", MQTTUtil.getMqttTopicFromCoreAddress("*",
defaultWildCardConfig));
+ assertEquals("/", MQTTUtil.getMqttTopicFromCoreAddress(".",
defaultWildCardConfig));
+ assertEquals("#", MQTTUtil.getMqttTopicFromCoreAddress("#",
defaultWildCardConfig));
+
+ WildcardConfiguration customWildCardConfig = new
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+ assertEquals("a/b/c", MQTTUtil.getMqttTopicFromCoreAddress("a|b|c",
customWildCardConfig));
+ assertEquals("a/+/c", MQTTUtil.getMqttTopicFromCoreAddress("a|$|c",
customWildCardConfig));
+ assertEquals("a/+/#", MQTTUtil.getMqttTopicFromCoreAddress("a|$|!",
customWildCardConfig));
+ assertEquals("1.0/device",
MQTTUtil.getMqttTopicFromCoreAddress("1.0|device", customWildCardConfig));
+ assertEquals("+", MQTTUtil.getMqttTopicFromCoreAddress("$",
customWildCardConfig));
+ assertEquals("/", MQTTUtil.getMqttTopicFromCoreAddress("|",
customWildCardConfig));
+ assertEquals("#", MQTTUtil.getMqttTopicFromCoreAddress("!",
customWildCardConfig));
+ }
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 197e130c67..074e729dd0 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -28,15 +28,7 @@ import org.apache.activemq.command.XATransactionId;
public class OpenWireUtil {
- public static class OpenWireWildcardConfiguration extends
WildcardConfiguration {
- public OpenWireWildcardConfiguration() {
- setDelimiter('.');
- setSingleWord('*');
- setAnyWords('>');
- }
- }
-
- public static final WildcardConfiguration OPENWIRE_WILDCARD = new
OpenWireWildcardConfiguration();
+ public static final WildcardConfiguration OPENWIRE_WILDCARD = new
WildcardConfiguration().setDelimiter('.').setAnyWords('>').setSingleWord('*');
public static final String SELECTOR_AWARE_OPTION = "selectorAware";
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
index bdaf36ef17..3ee2f7b686 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
@@ -28,6 +28,8 @@ public class WildcardConfiguration implements Serializable {
static final char DELIMITER = '.';
+ static final char ESCAPE = '\\';
+
boolean routingEnabled = true;
char singleWord = SINGLE_WORD;
@@ -42,19 +44,33 @@ public class WildcardConfiguration implements Serializable {
String delimiterString = String.valueOf(delimiter);
+ String escapeString = String.valueOf(ESCAPE);
+
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof WildcardConfiguration)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof WildcardConfiguration)) {
+ return false;
+ }
WildcardConfiguration that = (WildcardConfiguration) o;
- if (routingEnabled != that.routingEnabled) return false;
- if (singleWord != that.singleWord) return false;
- if (anyWords != that.anyWords) return false;
- return delimiter == that.delimiter;
-
+ if (routingEnabled != that.routingEnabled) {
+ return false;
+ }
+ if (singleWord != that.singleWord) {
+ return false;
+ }
+ if (anyWords != that.anyWords) {
+ return false;
+ }
+ if (delimiter != that.delimiter) {
+ return false;
+ }
+ return true;
}
@Override
@@ -80,8 +96,9 @@ public class WildcardConfiguration implements Serializable {
return routingEnabled;
}
- public void setRoutingEnabled(boolean routingEnabled) {
+ public WildcardConfiguration setRoutingEnabled(boolean routingEnabled) {
this.routingEnabled = routingEnabled;
+ return this;
}
public char getAnyWords() {
@@ -93,9 +110,10 @@ public class WildcardConfiguration implements Serializable {
}
- public void setAnyWords(char anyWords) {
+ public WildcardConfiguration setAnyWords(char anyWords) {
this.anyWords = anyWords;
this.anyWordsString = String.valueOf(anyWords);
+ return this;
}
public char getDelimiter() {
@@ -106,9 +124,10 @@ public class WildcardConfiguration implements Serializable
{
return delimiterString;
}
- public void setDelimiter(char delimiter) {
+ public WildcardConfiguration setDelimiter(char delimiter) {
this.delimiter = delimiter;
this.delimiterString = String.valueOf(delimiter);
+ return this;
}
public char getSingleWord() {
@@ -119,19 +138,94 @@ public class WildcardConfiguration implements
Serializable {
return singleWordString;
}
- public void setSingleWord(char singleWord) {
+ public WildcardConfiguration setSingleWord(char singleWord) {
this.singleWord = singleWord;
this.singleWordString = String.valueOf(singleWord);
+ return this;
}
- public String convert(String filter, WildcardConfiguration to) {
- if (this.equals(to)) {
- return filter;
+ /**
+ * Convert the input from this WildcardConfiguration into the specified
WildcardConfiguration.
+ *
+ * If the input already contains characters defined in the target
WildcardConfiguration then those characters will
+ * be escaped and preserved as such in the returned String. That said,
wildcard characters which are the same
+ * between the two configurations will not be escaped
+ *
+ * If the input already contains escaped characters defined in this
WildcardConfiguration then those characters will
+ * be unescaped after conversion and restored in the returned String.
+ *
+ * @param input the String to convert
+ * @param target the WildcardConfiguration to convert the input into
+ * @return the converted String
+ */
+ public String convert(final String input, final WildcardConfiguration
target) {
+ if (this.equals(target)) {
+ return input;
} else {
- return filter
- .replace(getDelimiter(), to.getDelimiter())
- .replace(getSingleWord(), to.getSingleWord())
- .replace(getAnyWords(), to.getAnyWords());
+ boolean escaped = isEscaped(input);
+ StringBuilder result;
+ if (!escaped) {
+ result = new StringBuilder(target.escape(input, this));
+ } else {
+ result = new StringBuilder(input);
+ }
+ replaceChar(result, getDelimiter(), target.getDelimiter());
+ replaceChar(result, getSingleWord(), target.getSingleWord());
+ replaceChar(result, getAnyWords(), target.getAnyWords());
+ if (escaped) {
+ return unescape(result.toString());
+ } else {
+ return result.toString();
+ }
+ }
+ }
+
+ private String escape(final String input, WildcardConfiguration from) {
+ String result = input.replace(escapeString, escapeString + escapeString);
+ if (delimiter != from.getDelimiter()) {
+ result = result.replace(getDelimiterString(), escapeString +
getDelimiterString());
+ }
+ if (singleWord != from.getSingleWord()) {
+ result = result.replace(getSingleWordString(), escapeString +
getSingleWordString());
+ }
+ if (anyWords != from.getAnyWords()) {
+ result = result.replace(getAnyWordsString(), escapeString +
getAnyWordsString());
+ }
+ return result;
+ }
+
+ private String unescape(final String input) {
+ return input
+ .replace(escapeString + escapeString, escapeString)
+ .replace(ESCAPE + getDelimiterString(), getDelimiterString())
+ .replace(ESCAPE + getSingleWordString(), getSingleWordString())
+ .replace(ESCAPE + getAnyWordsString(), getAnyWordsString());
+ }
+
+ private boolean isEscaped(final String input) {
+ for (int i = 0; i < input.length() - 1; i++) {
+ if (input.charAt(i) == ESCAPE && (input.charAt(i + 1) ==
getDelimiter() || input.charAt(i + 1) == getSingleWord() || input.charAt(i + 1)
== getAnyWords())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * This will replace one character with another while ignoring escaped
characters (i.e. those proceeded with '\').
+ *
+ * @param result the final result of the replacement
+ * @param replace the character to replace
+ * @param replacement the replacement character to use
+ */
+ private void replaceChar(StringBuilder result, char replace, char
replacement) {
+ if (replace == replacement) {
+ return;
+ }
+ for (int i = 0; i < result.length(); i++) {
+ if (result.charAt(i) == replace && (i == 0 || result.charAt(i - 1) !=
ESCAPE)) {
+ result.setCharAt(i, replacement);
+ }
}
}
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
new file mode 100644
index 0000000000..394541b558
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.config;
+
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WildcardConfigurationTest extends Assert {
+
+ private static final WildcardConfiguration MQTT_WILDCARD = new
WildcardConfiguration().setDelimiter('/').setAnyWords('#').setSingleWord('+');
+ private static final WildcardConfiguration DEFAULT_WILDCARD = new
WildcardConfiguration();
+
+ @Test
+ public void testDefaultWildcard() {
+ assertEquals('.', DEFAULT_WILDCARD.getDelimiter());
+ assertEquals('*', DEFAULT_WILDCARD.getSingleWord());
+ assertEquals('#', DEFAULT_WILDCARD.getAnyWords());
+ }
+
+ @Test
+ public void testToFromCoreMQTT() {
+ testToFromCoreMQTT("foo.foo", "foo/foo");
+ testToFromCoreMQTT("foo.*.foo", "foo/+/foo");
+ testToFromCoreMQTT("foo.#", "foo/#");
+ testToFromCoreMQTT("foo.*.foo.#", "foo/+/foo/#");
+ testToFromCoreMQTT("foo\\.foo.foo", "foo.foo/foo");
+ }
+
+ private void testToFromCoreMQTT(String coreAddress, String mqttTopicFilter)
{
+ assertEquals(coreAddress, MQTT_WILDCARD.convert(mqttTopicFilter,
DEFAULT_WILDCARD));
+ assertEquals(mqttTopicFilter, DEFAULT_WILDCARD.convert(coreAddress,
MQTT_WILDCARD));
+ }
+
+ @Test
+ public void testEquality() {
+ WildcardConfiguration a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ WildcardConfiguration b = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+
+ assertEquals(a, b);
+ assertEquals(b, a);
+ assertEquals(a.hashCode(), b.hashCode());
+
+ String toConvert = RandomUtil.randomString();
+ assertSame(toConvert, a.convert(toConvert, b));
+ assertSame(toConvert, a.convert(toConvert, a));
+ }
+
+ @Test
+ public void testEqualityNegative() {
+ WildcardConfiguration a;
+ WildcardConfiguration b;
+
+ // none equal
+ a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ b = new
WildcardConfiguration().setDelimiter('x').setAnyWords('y').setSingleWord('z');
+
+ assertNotEquals(a, b);
+ assertNotEquals(b, a);
+ assertNotEquals(a.hashCode(), b.hashCode());
+
+ // only delimiter equal
+ a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ b = new
WildcardConfiguration().setDelimiter('a').setAnyWords('y').setSingleWord('z');
+
+ assertNotEquals(a, b);
+ assertNotEquals(b, a);
+ assertNotEquals(a.hashCode(), b.hashCode());
+
+ // only anyWords equal
+ a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ b = new
WildcardConfiguration().setDelimiter('x').setAnyWords('b').setSingleWord('z');
+
+ assertNotEquals(a, b);
+ assertNotEquals(b, a);
+ assertNotEquals(a.hashCode(), b.hashCode());
+
+ // only singleWord equal
+ a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ b = new
WildcardConfiguration().setDelimiter('x').setAnyWords('y').setSingleWord('c');
+
+ assertNotEquals(a, b);
+ assertNotEquals(b, a);
+ assertNotEquals(a.hashCode(), b.hashCode());
+
+ // only delimiter not equal
+ a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ b = new
WildcardConfiguration().setDelimiter('x').setAnyWords('b').setSingleWord('c');
+
+ assertNotEquals(a, b);
+ assertNotEquals(b, a);
+ assertNotEquals(a.hashCode(), b.hashCode());
+
+ // only anyWords not equal
+ a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ b = new
WildcardConfiguration().setDelimiter('a').setAnyWords('y').setSingleWord('c');
+
+ assertNotEquals(a, b);
+ assertNotEquals(b, a);
+ assertNotEquals(a.hashCode(), b.hashCode());
+
+ // only singleWord not equal
+ a = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+ b = new
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('z');
+
+ assertNotEquals(a, b);
+ assertNotEquals(b, a);
+ assertNotEquals(a.hashCode(), b.hashCode());
+ }
+}
\ No newline at end of file
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 0c02c47969..8b74af49bc 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -129,7 +129,8 @@ If you perform some custom validation of the client ID you
can reject the client
== Wildcard subscriptions
-MQTT addresses are hierarchical much like a file system, and they use a
special character (i.e. `/` by default) to separate hierarchical levels.
+MQTT defines a special wildcard syntax for topic filters. This definition is
found in section 4.7.1 of both the
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718107[3.1.1]
and
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901242[5]
specs.
+MQTT topics are hierarchical much like a file system, and they use a special
character (i.e. `/` by default) to separate hierarchical levels.
Subscribers are able to subscribe to specific topics or to whole branches of a
hierarchy.
To subscribe to branches of an address hierarchy a subscriber can use wild
cards.
@@ -147,9 +148,20 @@ This can be useful, but should be done so with care since
it has significant per
Matches a single level in the address hierarchy.
For example `/uk/+/stores` would match `/uk/newcastle/stores` but not
`/uk/cities/newcastle/stores`.
-These MQTT-specific wildcards are automatically _translated_ into the wildcard
syntax used by ActiveMQ Artemis.
-These wildcards are configurable.
-See the xref:wildcard-syntax.adoc#customizing-the-syntax[Wildcard Syntax]
chapter for details about how to configure custom wildcards.
+This is _close_ to the default
xref:wildcard-syntax.adoc#wildcard-syntax[wildcard syntax], but not exactly the
same.
+Therefore, some conversion is necessary.
+This conversion isn't free so *if you want the best MQTT performance* use
`broker.xml` to configure the wildcard syntax to match MQTT's, e.g.:
+
+[,xml]
+----
+<wildcard-addresses>
+ <delimiter>/</delimiter>
+ <any-words>#</any-words>
+ <single-word>*</single-word>
+</wildcard-addresses>
+----
+
+Of course, changing the default syntax also means other clients on other
protocols will need to follow this same syntax as well as the `match` values of
your `address-setting` configuration elements.
== Web Sockets
diff --git a/docs/user-manual/versions.adoc b/docs/user-manual/versions.adoc
index e0be84f7c3..610d681c38 100644
--- a/docs/user-manual/versions.adoc
+++ b/docs/user-manual/versions.adoc
@@ -12,6 +12,28 @@ NOTE: If the upgrade spans multiple versions then the steps
from *each* version
NOTE: Follow the general upgrade procedure outlined in the
xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker] chapter in
addition to any version-specific upgrade instructions outlined here.
+== 2.33.0
+
+https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=...[Full
release notes]
+
+=== Highlights
+
+* highlight 1
+* highlight 2
+
+=== Upgrading from 2.32.0
+
+* Due to https://issues.apache.org/jira/browse/ARTEMIS-4532[ARTEMIS-4532] the
names of addresses and queues related to MQTT topics and subscriptions
respectively may change.
+This will only impact you if *both* of the following are true:
++
+. The broker is configured to use a xref:wildcard-syntax.adoc[wildcard syntax]
which _doesn't match_ the xref:mqtt.adoc#wildcard-syntax[MQTT wildcard syntax]
(e.g. the default wildcard syntax).
+. You are using characters from the broker's wildcard syntax in your MQTT
topic name or filter.
+For example, if you were using the default wildcard syntax and an MQTT topic
named `1.0/group/device`.
+The dot (`.`) character here is part of the broker's wildcard syntax, and it
is being used in the name of an MQTT topic.
++
+In this case the characters from the broker's wildcard syntax that do not
match the characters in the MQTT wildcard syntax will be escaped with a
backslash (i.e. `\`).
+To avoid this conversion you can configure the broker to use the MQTT wildcard
syntax or change the name of the MQTT topic name or filter.
+
== 2.32.0
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353769[Full
release notes]
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index 91ba3d732d..4575e29aa7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1903,10 +1903,10 @@ public class MQTTTest extends MQTTTestSupport {
Exception peerDisconnectedException = null;
try {
String clientId = "test.client";
- String coreAddress = MQTTUtil.convertMqttTopicFilterToCore("foo/bar",
server.getConfiguration().getWildcardConfiguration());
+ String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic("foo/bar",
server.getConfiguration().getWildcardConfiguration());
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar",
QoS.AT_LEAST_ONCE)};
- getServer().createQueue(new QueueConfiguration(new
SimpleString(clientId + "." +
coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
+ getServer().createQueue(new
QueueConfiguration(MQTTUtil.getCoreQueueFromMqttTopic("foo/bar", clientId,
server.getConfiguration().getWildcardConfiguration())).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
@@ -2151,11 +2151,11 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)
public void testAutoDeleteRetainedQueue() throws Exception {
final String TOPIC = "/abc/123";
- final String RETAINED_QUEUE =
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, server.getConfiguration().getWildcardConfiguration());
+ final String RETAINED_QUEUE =
MQTTUtil.getCoreRetainAddressFromMqttTopic(TOPIC,
server.getConfiguration().getWildcardConfiguration());
final MQTTClientProvider publisher = getMQTTClientProvider();
final MQTTClientProvider subscriber = getMQTTClientProvider();
-
server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCore("#",
server.getConfiguration().getWildcardConfiguration()), new
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
+
server.getAddressSettingsRepository().addMatch(MQTTUtil.getCoreAddressFromMqttTopic("#",
server.getConfiguration().getWildcardConfiguration()), new
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
initializeConnection(publisher);
initializeConnection(subscriber);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
index 491c065fe3..a15360f115 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
@@ -201,7 +202,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
value.add(new Role("browser", false, false, false, false, false,
false, false, true, false, false));
value.add(new Role("guest", false, true, false, false, false, false,
false, true, false, false));
value.add(new Role("full", true, true, true, true, true, true, true,
true, true, true));
- securityRepository.addMatch(getQueueName(), value);
+
securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(),
server.getConfiguration().getWildcardConfiguration()), value);
server.getConfiguration().setSecurityEnabled(true);
} else {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
index 8a6341a8b0..01d2e63b7f 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
@@ -81,8 +81,8 @@ public class MqttWildCardSubAutoCreateTest extends
MQTTTestSupport {
String subscriberId = UUID.randomUUID().toString();
String senderId = UUID.randomUUID().toString();
- String subscribeTo = "A.*";
- String publishTo = "A.a";
+ String subscribeTo = "A/+";
+ String publishTo = "A/a";
subscriber = createMqttClient(subscriberId);
subscriber.subscribe(subscribeTo, 2);
@@ -93,7 +93,7 @@ public class MqttWildCardSubAutoCreateTest extends
MQTTTestSupport {
sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2,
false);
sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2,
false);
- assertTrue(server.getPagingManager().getPageStore(new
SimpleString(subscribeTo)).isPaging());
+ assertTrue(server.getPagingManager().getPageStore(new
SimpleString(MQTTUtil.getCoreAddressFromMqttTopic(subscribeTo,
server.getConfiguration().getWildcardConfiguration()))).isPaging());
subscriber = createMqttClient(subscriberId);
subscriber.subscribe(subscribeTo, 2);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
index b13b2b5ef4..0af994e89c 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -53,7 +54,7 @@ public class PahoMQTTQOS2SecurityTest extends MQTTTestSupport
{
HashSet<Role> value = new HashSet<>();
value.add(new Role("addressOnly", true, true, true, true, false, false,
false, false, true, true));
- securityRepository.addMatch(getQueueName(), value);
+
securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(),
server.getConfiguration().getWildcardConfiguration()), value);
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 247fdb68ae..05f7eb9b31 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -83,6 +84,27 @@ public class MQTT5Test extends MQTT5TestSupport {
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testTopicNameEscape() throws Exception {
+ final String topic = "foo1.0/bar/baz";
+ AtomicReference<String> receivedTopic = new AtomicReference<>();
+
+ MqttClient subscriber = createPahoClient("subscriber");
+ subscriber.connect();
+ subscriber.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String t, MqttMessage message) {
+ receivedTopic.set(t);
+ }
+ });
+ subscriber.subscribe(topic, AT_LEAST_ONCE);
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1,
false);
+ Wait.assertEquals(topic, receivedTopic::get, 500, 50);
+ }
+
/*
* Ensure that the broker adds a timestamp on the message when sending via
MQTT
*/
@@ -333,7 +355,7 @@ public class MQTT5Test extends MQTT5TestSupport {
consumer1.subscribe(SHARED_SUB1, 1);
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
- Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+ Queue q1 = getSharedSubscriptionQueue(SHARED_SUB1);
assertNotNull(q1);
assertEquals(TOPIC1, q1.getAddress().toString());
assertEquals(1, q1.getConsumerCount());
@@ -344,7 +366,7 @@ public class MQTT5Test extends MQTT5TestSupport {
consumer2.subscribe(SHARED_SUB2, 1);
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
- Queue q2 = getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME);
+ Queue q2 = getSharedSubscriptionQueue(SHARED_SUB2);
assertNotNull(q2);
assertEquals(TOPIC2, q2.getAddress().toString());
assertEquals(1, q2.getConsumerCount());
@@ -360,10 +382,10 @@ public class MQTT5Test extends MQTT5TestSupport {
assertTrue(ackLatch2.await(2, TimeUnit.SECONDS));
consumer1.unsubscribe(SHARED_SUB1);
- assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
+ assertNull(getSharedSubscriptionQueue(SHARED_SUB1));
consumer2.unsubscribe(SHARED_SUB2);
- assertNull(getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME));
+ assertNull(getSharedSubscriptionQueue(SHARED_SUB2));
consumer1.disconnect();
consumer1.close();
@@ -388,13 +410,13 @@ public class MQTT5Test extends MQTT5TestSupport {
consumer.subscribe(SHARED_SUBS, new int[]{1, 1});
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
- Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+ Queue q1 = getSharedSubscriptionQueue(SHARED_SUBS[0]);
assertNotNull(q1);
assertEquals(TOPIC1, q1.getAddress().toString());
assertEquals(1, q1.getConsumerCount());
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
- Queue q2 = getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME);
+ Queue q2 = getSharedSubscriptionQueue(SHARED_SUBS[1]);
assertNotNull(q2);
assertEquals(TOPIC2, q2.getAddress().toString());
assertEquals(1, q2.getConsumerCount());
@@ -409,8 +431,8 @@ public class MQTT5Test extends MQTT5TestSupport {
assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
consumer.unsubscribe(SHARED_SUBS);
- assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
- assertNull(getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME));
+ assertNull(getSharedSubscriptionQueue(SHARED_SUBS[0]));
+ assertNull(getSharedSubscriptionQueue(SHARED_SUBS[1]));
consumer.disconnect();
consumer.close();
@@ -644,7 +666,7 @@ public class MQTT5Test extends MQTT5TestSupport {
MqttClient client = createPahoClient(clientID);
client.connect();
client.subscribe(topic, 1);
- Wait.assertTrue(() ->
server.locateQueue(SimpleString.toSimpleString(clientID.concat(".").concat(topic.replace('/',
'.')))) != null, 2000, 100);
+ Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) != null,
2000, 100);
client.disconnect();
client.close();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index a4f106a2fd..e2cd5e66b9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -37,11 +37,10 @@ import
org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
@@ -107,7 +106,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
return new MqttAsyncClient(TCP + "://localhost:" + (isUseSsl() ?
getSslPort() : getPort()), clientId, new MemoryPersistence());
}
- private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static final long DEFAULT_TIMEOUT = 300000;
protected ActiveMQServer server;
@@ -345,40 +344,16 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
return null;
}
- protected Queue getSubscriptionQueue(String TOPIC) {
- try {
- Object[] array =
server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray();
- if (array.length == 0) {
- return null;
- } else {
- return ((LocalQueueBinding)array[0]).getQueue();
- }
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
+ protected Queue getSharedSubscriptionQueue(String mqttTopicFilter) {
+ return getSubscriptionQueue(mqttTopicFilter, null);
}
- protected Queue getSubscriptionQueue(String TOPIC, String clientId) {
- return getSubscriptionQueue(TOPIC, clientId, null);
+ protected Queue getSubscriptionQueue(String mqttTopicFilter, String
clientId) {
+ return
server.locateQueue(MQTTUtil.getCoreQueueFromMqttTopic(mqttTopicFilter,
clientId, server.getConfiguration().getWildcardConfiguration()));
}
- protected Queue getSubscriptionQueue(String TOPIC, String clientId, String
sharedSubscriptionName) {
- try {
- for (Binding b :
server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(TOPIC)))
{
- if (sharedSubscriptionName != null) {
- if
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(sharedSubscriptionName)))
{
- return ((LocalQueueBinding)b).getQueue();
- }
- } else if
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId)))
{
- return ((LocalQueueBinding)b).getQueue();
- }
- }
- return null;
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
+ protected Queue getRetainedMessageQueue(String mqttTopicFilter) {
+ return
server.locateQueue(MQTTUtil.getCoreRetainAddressFromMqttTopic(mqttTopicFilter,
server.getConfiguration().getWildcardConfiguration()));
}
protected void setAcceptorProperty(String property) throws Exception {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
index 3340df263d..2113fb194a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
@@ -55,10 +55,11 @@ public class ControlPacketFormatTests extends
MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testPacketIdQoSZero() throws Exception {
final String TOPIC = this.getTopicName();
+ final String CONSUMER_CLIENT_ID = "consumer";
final int MESSAGE_COUNT = 100;
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
- MqttClient consumer = createPahoClient("consumer");
+ MqttClient consumer = createPahoClient(CONSUMER_CLIENT_ID);
consumer.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws
Exception {
@@ -75,7 +76,7 @@ public class ControlPacketFormatTests extends
MQTT5TestSupport {
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.publish(TOPIC, ("foo" + i).getBytes(), 0, false);
}
- Wait.assertEquals(MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded());
+ Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC,
CONSUMER_CLIENT_ID).getMessagesAdded());
producer.disconnect();
producer.close();
@@ -111,15 +112,15 @@ public class ControlPacketFormatTests extends
MQTT5TestSupport {
});
consumer.connect();
consumer.subscribe(TOPIC, 2);
- Wait.assertTrue(() -> getSubscriptionQueue(TOPIC) != null);
- Wait.assertEquals(1, () ->
getSubscriptionQueue(TOPIC).getConsumerCount());
+ Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) != null);
+ Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getConsumerCount());
MqttClient producer = createPahoClient("producer");
producer.connect();
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.publish(TOPIC, ("foo" + i).getBytes(),
(RandomUtil.randomPositiveInt() % 2) + 1, false);
}
- Wait.assertEquals(MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded());
+ Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesAdded());
producer.disconnect();
producer.close();
@@ -173,7 +174,8 @@ public class ControlPacketFormatTests extends
MQTT5TestSupport {
final String TOPIC = this.getTopicName();
final CountDownLatch latch = new CountDownLatch(1);
- MqttClient consumer = createPahoClient("consumer");
+ final String CONSUMER_ID = "consumer";
+ MqttClient consumer = createPahoClient(CONSUMER_ID);
consumer.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws
Exception {
@@ -186,11 +188,11 @@ public class ControlPacketFormatTests extends
MQTT5TestSupport {
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 2,
false);
- Wait.assertEquals((long) 1, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+ Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesAdded(), 2000, 100);
producer.disconnect();
producer.close();
- Wait.assertEquals(1L, () ->
getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 15000, 100);
+ Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesAcknowledged(), 15000, 100);
assertTrue(latch.await(15, TimeUnit.SECONDS));
Wait.assertFalse(() -> failed.get(), 2000, 100);
Wait.assertEquals(8, () -> packetCount.get());
@@ -243,7 +245,8 @@ public class ControlPacketFormatTests extends
MQTT5TestSupport {
final String TOPIC = this.getTopicName();
final CountDownLatch latch = new CountDownLatch(1);
- MqttClient consumer = createPahoClient("consumer");
+ final String CONSUMER_ID = "consumer";
+ MqttClient consumer = createPahoClient(CONSUMER_ID);
consumer.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws
Exception {
@@ -256,11 +259,11 @@ public class ControlPacketFormatTests extends
MQTT5TestSupport {
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 1,
false);
- Wait.assertEquals((long) 1, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+ Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesAdded(), 2000, 100);
producer.disconnect();
producer.close();
- Wait.assertEquals(1L, () ->
getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 15000, 100);
+ Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesAcknowledged(), 15000, 100);
assertTrue(latch.await(15, TimeUnit.SECONDS));
Wait.assertFalse(() -> failed.get(), 2000, 100);
Wait.assertEquals(4, () -> packetCount.get());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
index 35b1dabeed..3798edf8a1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
@@ -46,12 +46,13 @@ public class MessageReceiptTests extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testMessageReceipt() throws Exception {
final String TOPIC = RandomUtil.randomString();
+ final String CONSUMER_ID = "consumer";
final int CONSUMER_COUNT = 25;
final MqttClient[] consumers = new MqttClient[CONSUMER_COUNT];
final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT);
for (int i = 0; i < CONSUMER_COUNT; i++) {
- MqttClient consumer = createPahoClient(RandomUtil.randomString());
+ MqttClient consumer = createPahoClient(CONSUMER_ID + i);
consumers[i] = consumer;
consumer.connect();
int finalI = i;
@@ -75,7 +76,7 @@ public class MessageReceiptTests extends MQTT5TestSupport {
Wait.assertEquals((long) CONSUMER_COUNT, () -> {
int totalMessagesAdded = 0;
for (int i = 0; i < CONSUMER_COUNT; i++) {
- totalMessagesAdded += getSubscriptionQueue(TOPIC +
i).getMessagesAdded();
+ totalMessagesAdded += getSubscriptionQueue(TOPIC + i, CONSUMER_ID
+ i).getMessagesAdded();
}
return totalMessagesAdded;
}, 2000, 100);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
index e28c6cf54c..9e307b2de4 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
@@ -101,14 +101,15 @@ public class QoSTests extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testQoS1PubAck() throws Exception {
final String TOPIC = RandomUtil.randomString();
+ final String CONSUMER_ID = "consumer";
final CountDownLatch ackLatch = new CountDownLatch(1);
final AtomicInteger packetId = new AtomicInteger();
MQTTInterceptor incomingInterceptor = (packet, connection) -> {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBACK) {
// ensure the message is still in the queue before we get the ack
from the client
- assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount());
- assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount());
+ assertEquals(1, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount());
+ assertEquals(1, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getDeliveringCount());
// ensure the ids match so we know this is the "corresponding"
PUBACK for the previous PUBLISH
assertEquals(packetId.get(),
((MqttPubReplyMessageVariableHeader)packet.variableHeader()).messageId());
@@ -129,7 +130,7 @@ public class QoSTests extends MQTT5TestSupport {
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
final CountDownLatch latch = new CountDownLatch(1);
- MqttClient consumer = createPahoClient("consumer");
+ MqttClient consumer = createPahoClient(CONSUMER_ID);
consumer.connect();
consumer.setCallback(new LatchedMqttCallback(latch));
consumer.subscribe(TOPIC, 1);
@@ -142,8 +143,8 @@ public class QoSTests extends MQTT5TestSupport {
assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
assertTrue(latch.await(2, TimeUnit.SECONDS));
- assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount());
- assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount());
+ assertEquals(0, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount());
+ assertEquals(0, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getDeliveringCount());
consumer.disconnect();
consumer.close();
}
@@ -241,14 +242,15 @@ public class QoSTests extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testQoS2PubRec() throws Exception {
final String TOPIC = RandomUtil.randomString();
+ final String CONSUMER_ID = "consumer";
final CountDownLatch ackLatch = new CountDownLatch(1);
final AtomicInteger packetId = new AtomicInteger();
MQTTInterceptor incomingInterceptor = (packet, connection) -> {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) {
// ensure the message is still in the queue before we get the ack
from the client
- assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount());
- assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount());
+ assertEquals(1, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount());
+ assertEquals(1, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getDeliveringCount());
// ensure the ids match so we know this is the "corresponding"
PUBREC for the previous PUBLISH
assertEquals(packetId.get(),
((MqttPubReplyMessageVariableHeader)packet.variableHeader()).messageId());
@@ -269,7 +271,7 @@ public class QoSTests extends MQTT5TestSupport {
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
final CountDownLatch latch = new CountDownLatch(1);
- MqttClient consumer = createPahoClient("consumer");
+ MqttClient consumer = createPahoClient(CONSUMER_ID);
consumer.connect();
consumer.setCallback(new LatchedMqttCallback(latch));
consumer.subscribe(TOPIC, 2);
@@ -282,8 +284,8 @@ public class QoSTests extends MQTT5TestSupport {
assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
assertTrue(latch.await(2, TimeUnit.SECONDS));
- assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount());
- assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount());
+ assertEquals(0, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount());
+ assertEquals(0, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getDeliveringCount());
consumer.disconnect();
consumer.close();
}
@@ -348,7 +350,7 @@ public class QoSTests extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testQoS2PubRel() throws Exception {
final String TOPIC = RandomUtil.randomString();
- final String CONSUMER_CLIENT_ID = "consumer";
+ final String CONSUMER_ID = "consumer";
final CountDownLatch ackLatch = new CountDownLatch(1);
final AtomicInteger packetId = new AtomicInteger();
@@ -356,8 +358,8 @@ public class QoSTests extends MQTT5TestSupport {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBCOMP) {
try {
// ensure the message is still in the management queue before
we get the PUBCOMP from the client
- Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
CONSUMER_CLIENT_ID).getMessageCount(), 2000, 100);
- Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
CONSUMER_CLIENT_ID).getDeliveringCount(), 2000, 100);
+ Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
CONSUMER_ID).getMessageCount(), 2000, 100);
+ Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
CONSUMER_ID).getDeliveringCount(), 2000, 100);
} catch (Exception e) {
return false;
}
@@ -381,7 +383,7 @@ public class QoSTests extends MQTT5TestSupport {
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
final CountDownLatch latch = new CountDownLatch(1);
- MqttClient consumer = createPahoClient(CONSUMER_CLIENT_ID);
+ MqttClient consumer = createPahoClient(CONSUMER_ID);
consumer.connect();
consumer.setCallback(new LatchedMqttCallback(latch));
consumer.subscribe(TOPIC, 2);
@@ -394,8 +396,8 @@ public class QoSTests extends MQTT5TestSupport {
assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
assertTrue(latch.await(2, TimeUnit.SECONDS));
- assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount());
- assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount());
+ assertEquals(0, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount());
+ assertEquals(0, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getDeliveringCount());
consumer.disconnect();
consumer.close();
}
@@ -412,6 +414,7 @@ public class QoSTests extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testQoS2WithExpiration() throws Exception {
final String TOPIC = "myTopic";
+ final String CONSUMER_ID = "consumer";
final CountDownLatch ackLatch = new CountDownLatch(1);
final CountDownLatch expireRefsLatch = new CountDownLatch(1);
final long messageExpiryInterval = 2;
@@ -419,12 +422,12 @@ public class QoSTests extends MQTT5TestSupport {
MQTTInterceptor incomingInterceptor = (packet, connection) -> {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) {
// ensure the message is still in the queue before we get the
PUBREC from the client
- assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount());
- assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount());
+ assertEquals(1, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount());
+ assertEquals(1, getSubscriptionQueue(TOPIC,
CONSUMER_ID).getDeliveringCount());
try {
// ensure enough time has passed for the message to expire
Thread.sleep(messageExpiryInterval * 1500);
-
getSubscriptionQueue(TOPIC).expireReferences(expireRefsLatch::countDown);
+ getSubscriptionQueue(TOPIC,
CONSUMER_ID).expireReferences(expireRefsLatch::countDown);
assertTrue(expireRefsLatch.await(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
@@ -438,7 +441,7 @@ public class QoSTests extends MQTT5TestSupport {
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
final CountDownLatch latch = new CountDownLatch(1);
- MqttClient consumer = createPahoClient("consumer");
+ MqttClient consumer = createPahoClient(CONSUMER_ID);
consumer.connect();
consumer.setCallback(new DefaultMqttCallback() {
@Override
@@ -462,9 +465,9 @@ public class QoSTests extends MQTT5TestSupport {
assertTrue(ackLatch.await(messageExpiryInterval * 2, TimeUnit.SECONDS));
assertTrue(latch.await(messageExpiryInterval * 2, TimeUnit.SECONDS));
- Wait.assertEquals(0, () ->
getSubscriptionQueue(TOPIC).getMessageCount());
- Wait.assertEquals(0, () ->
getSubscriptionQueue(TOPIC).getDeliveringCount());
- Wait.assertEquals(0, () ->
getSubscriptionQueue(TOPIC).getMessagesExpired());
+ Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount());
+ Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getDeliveringCount());
+ Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesExpired());
consumer.disconnect();
consumer.close();
}
@@ -628,7 +631,10 @@ public class QoSTests extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testQoS2WithExpiration2() throws Exception {
final String TOPIC = "myTopic";
- server.createQueue(new
QueueConfiguration(RandomUtil.randomString()).setAddress(TOPIC).setRoutingType(RoutingType.MULTICAST));
+ final String CONSUMER_ID = "consumer";
+ server.createQueue(new
QueueConfiguration(MQTTUtil.getCoreQueueFromMqttTopic(TOPIC, CONSUMER_ID,
server.getConfiguration().getWildcardConfiguration()))
+
.setAddress(MQTTUtil.getCoreAddressFromMqttTopic(TOPIC,
server.getConfiguration().getWildcardConfiguration()))
+ .setRoutingType(RoutingType.MULTICAST));
final CountDownLatch ackLatch = new CountDownLatch(1);
final CountDownLatch expireRefsLatch = new CountDownLatch(1);
final long messageExpiryInterval = 1;
@@ -636,11 +642,11 @@ public class QoSTests extends MQTT5TestSupport {
MQTTInterceptor outgoingInterceptor = (packet, connection) -> {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) {
// ensure the message is in the queue before trying to expire
- Wait.assertTrue(() ->
getSubscriptionQueue(TOPIC).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount() == 1, 2000, 100);
try {
// ensure enough time has passed for the message to expire
Thread.sleep(messageExpiryInterval * 1500);
-
getSubscriptionQueue(TOPIC).expireReferences(expireRefsLatch::countDown);
+ getSubscriptionQueue(TOPIC,
CONSUMER_ID).expireReferences(expireRefsLatch::countDown);
assertTrue(expireRefsLatch.await(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
@@ -666,6 +672,6 @@ public class QoSTests extends MQTT5TestSupport {
producer.close();
assertTrue(ackLatch.await(messageExpiryInterval * 2, TimeUnit.SECONDS));
- Wait.assertEquals(1, () ->
getSubscriptionQueue(TOPIC).getMessagesExpired());
+ Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesExpired());
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
index 6cea9dd86d..612882525d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
@@ -480,13 +480,13 @@ public class ConnectTests extends MQTT5TestSupport {
producer.publish(TOPIC, bytes, 2, false);
producer.disconnect();
producer.close();
- Wait.assertEquals(1L, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+ Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesAdded(), 2000, 100);
// the client should *not* receive the message
assertFalse(latch.await(2, TimeUnit.SECONDS));
// the broker should acknowledge the message since it exceeded the
client's max packet size
- Wait.assertEquals(1L, () ->
getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 2000, 100);
+ Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessagesAcknowledged(), 2000, 100);
consumer.disconnect();
consumer.close();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index 655d5495ed..caa34d47c7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -16,6 +16,7 @@
*/
package
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
+import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,7 +30,6 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
@@ -48,7 +48,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
/**
* Fulfilled by client or Netty codec (i.e. not tested here):
@@ -260,12 +259,14 @@ public class PublishTests extends MQTT5TestSupport {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
+ assertNull(getRetainedMessageQueue(TOPIC));
+
MqttClient producer = createPahoClient("producer");
producer.connect();
// send first retained message
producer.publish(TOPIC, "retain1".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
== 1, 2000, 100);
// send second retained message; should *remove* the first
producer.publish(TOPIC, new byte[0], 2, true);
@@ -273,7 +274,7 @@ public class PublishTests extends MQTT5TestSupport {
producer.disconnect();
producer.close();
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0, 2000, 100);
+ Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
== 0, 2000, 100);
final CountDownLatch latch = new CountDownLatch(1);
MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -302,12 +303,14 @@ public class PublishTests extends MQTT5TestSupport {
final String RETAINED_PAYLOAD = RandomUtil.randomString();
final String UNRETAINED_PAYLOAD = RandomUtil.randomString();
+ assertNull(getRetainedMessageQueue(TOPIC));
+
MqttClient producer = createPahoClient("producer");
producer.connect();
// send retained message
producer.publish(TOPIC, RETAINED_PAYLOAD.getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 1000, 100);
+ Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
== 1, 1000, 100);
// send an unretained message; should *not* remove the existing retained
message
producer.publish(TOPIC, UNRETAINED_PAYLOAD.getBytes(), 2, false);
@@ -315,7 +318,7 @@ public class PublishTests extends MQTT5TestSupport {
producer.disconnect();
producer.close();
- Wait.assertFalse(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1, 1000, 100);
+ Wait.assertFalse(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
> 1, 1000, 100);
final CountDownLatch latch = new CountDownLatch(1);
MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -395,13 +398,17 @@ public class PublishTests extends MQTT5TestSupport {
retainedPayloads[i] = RandomUtil.randomString();
}
+ for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
+ assertNull(getRetainedMessageQueue(topicNames[i]));
+ }
+
// send retained messages
MqttClient producer = createPahoClient("producer");
producer.connect();
for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
final String topicName = topicNames[i];
producer.publish(topicName, retainedPayloads[i].getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
topicName,
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1,
2000, 100);
+ Wait.assertTrue(() ->
getRetainedMessageQueue(topicName).getMessageCount() == 1, 2000, 100);
}
producer.disconnect();
producer.close();
@@ -458,12 +465,14 @@ public class PublishTests extends MQTT5TestSupport {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
+ assertNull(getRetainedMessageQueue(TOPIC));
+
// send retained messages
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC,
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1,
2000, 100);
+ Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
== 1, 2000, 100);
producer.disconnect();
producer.close();
@@ -491,7 +500,7 @@ public class PublishTests extends MQTT5TestSupport {
assertTrue(latch.await(2, TimeUnit.SECONDS));
// ensure the retained message has been successfully acknowledge and
removed from the subscription queue
- Wait.assertTrue(() -> getSubscriptionQueue(TOPIC).getMessageCount() ==
0, 2000, 100);
+ Wait.assertTrue(() -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount() == 0, 2000, 100);
consumer.disconnect();
@@ -522,11 +531,13 @@ public class PublishTests extends MQTT5TestSupport {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
+ assertNull(getRetainedMessageQueue(TOPIC));
+
// send first retained message
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
== 1, 2000, 100);
producer.disconnect();
producer.close();
@@ -578,11 +589,13 @@ public class PublishTests extends MQTT5TestSupport {
subscription.setRetainAsPublished(false);
consumer.subscribe(new MqttSubscription[]{subscription});
+ assertNull(getRetainedMessageQueue(TOPIC));
+
// send retained message
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
== 1, 2000, 100);
producer.disconnect();
producer.close();
@@ -627,11 +640,13 @@ public class PublishTests extends MQTT5TestSupport {
subscription.setRetainAsPublished(true);
consumer.subscribe(new MqttSubscription[]{subscription});
+ assertNull(getRetainedMessageQueue(TOPIC));
+
// send retained message
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
- Wait.assertTrue(() ->
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+ Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount()
== 1, 2000, 100);
producer.disconnect();
producer.close();
@@ -821,9 +836,9 @@ public class PublishTests extends MQTT5TestSupport {
producer.disconnect();
producer.close();
- Wait.assertEquals(1L, () ->
getSubscriptionQueue(TOPIC).getMessageCount(), 1000, 100);
+ Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount(), 1000, 100);
Wait.assertEquals(1L, () ->
server.locateQueue("EXPIRY").getMessageCount(), 3000, 100);
- Wait.assertEquals(0L, () ->
getSubscriptionQueue(TOPIC).getMessageCount(), 1000, 100);
+ Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount(), 1000, 100);
consumer.connect(options);
assertFalse(latch.await(1, TimeUnit.SECONDS));
@@ -874,7 +889,7 @@ public class PublishTests extends MQTT5TestSupport {
producer.disconnect();
producer.close();
- Wait.assertEquals(1L, () ->
getSubscriptionQueue(TOPIC).getMessageCount(), 500, 100);
+ Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount(), 500, 100);
Thread.sleep(SLEEP);
@@ -1571,7 +1586,8 @@ public class PublishTests extends MQTT5TestSupport {
final String TOPIC = this.getTopicName();
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
- MqttAsyncClient consumer =
createAsyncPahoClient(RandomUtil.randomString());
+ final String CONSUMER_ID = "consumer";
+ MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setReceiveMaximum(RECEIVE_MAXIMUM);
consumer.connect(options).waitForCompletion();
@@ -1589,11 +1605,11 @@ public class PublishTests extends MQTT5TestSupport {
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8),
(RandomUtil.randomPositiveInt() % 2) + 1, false);
}
- Wait.assertEquals((long) MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+ Wait.assertEquals((long) MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100);
producer.disconnect();
producer.close();
- Wait.assertEquals(0L, () ->
getSubscriptionQueue(TOPIC).getMessageCount(), 15000, 100);
+ Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount(), 15000, 100);
assertTrue(latch.await(15, TimeUnit.SECONDS));
assertFalse(failed.get());
consumer.disconnect();
@@ -1633,7 +1649,8 @@ public class PublishTests extends MQTT5TestSupport {
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
- MqttAsyncClient consumer =
createAsyncPahoClient(RandomUtil.randomString());
+ final String CONSUMER_ID = "consumer";
+ MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setReceiveMaximum(RECEIVE_MAXIMUM);
consumer.connect(options).waitForCompletion();
@@ -1651,11 +1668,11 @@ public class PublishTests extends MQTT5TestSupport {
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.publish(TOPIC, ("foo" + i).getBytes(StandardCharsets.UTF_8),
0, false);
}
- Wait.assertEquals((long) MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+ Wait.assertEquals((long) MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100);
producer.disconnect();
producer.close();
- Wait.assertEquals(0L, () ->
getSubscriptionQueue(TOPIC).getMessageCount(), 8000, 100);
+ Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC,
CONSUMER_ID).getMessageCount(), 8000, 100);
assertTrue(latch.await(8, TimeUnit.SECONDS));
assertTrue(succeeded.get());
consumer.disconnect();
@@ -1675,6 +1692,7 @@ public class PublishTests extends MQTT5TestSupport {
final int MESSAGE_COUNT = 2;
final int RECEIVE_MAXIMUM = 1;
final String TOPIC = this.getTopicName();
+ final String CONSUMER_ID = "consumer";
final AtomicBoolean messageArrived = new AtomicBoolean(false);
MQTTInterceptor outgoingInterceptor = (packet, connection) -> {
@@ -1686,7 +1704,7 @@ public class PublishTests extends MQTT5TestSupport {
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
final CountDownLatch latch = new CountDownLatch(1);
- MqttClient consumer = createPahoClient("consumer");
+ MqttClient consumer = createPahoClient(CONSUMER_ID);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setReceiveMaximum(RECEIVE_MAXIMUM);
options.setKeepAliveInterval(2);
@@ -1705,7 +1723,7 @@ public class PublishTests extends MQTT5TestSupport {
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 2,
false);
}
- Wait.assertEquals((long) MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+ Wait.assertEquals((long) MESSAGE_COUNT, () ->
getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100);
producer.disconnect();
producer.close();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
index a00c30a9f1..e2e6961250 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
@@ -76,6 +77,7 @@ public class PublishTestsWithSecurity extends
MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testSendAuthorizationFailure() throws Exception {
final String CLIENT_ID = "publisher";
+ final String TOPIC = "/foo";
final CountDownLatch latch = new CountDownLatch(1);
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.username(createAddressUser)
@@ -91,7 +93,7 @@ public class PublishTestsWithSecurity extends
MQTT5TestSupport {
});
try {
- client.publish("/foo", new byte[0], 2, false);
+ client.publish(TOPIC, new byte[0], 2, false);
fail("Publishing should have failed with a security problem");
} catch (MqttException e) {
assertEquals(MQTTReasonCodes.NOT_AUTHORIZED, (byte)
e.getReasonCode());
@@ -103,7 +105,7 @@ public class PublishTestsWithSecurity extends
MQTT5TestSupport {
assertFalse(client.isConnected());
- Wait.assertTrue(() ->
server.getAddressInfo(SimpleString.toSimpleString(".foo")) != null, 2000, 100);
+ Wait.assertTrue(() ->
server.getAddressInfo(SimpleString.toSimpleString(MQTTUtil.getCoreAddressFromMqttTopic(TOPIC,
server.getConfiguration().getWildcardConfiguration()))) != null, 2000, 100);
}
@Test(timeout = DEFAULT_TIMEOUT)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
index 9eaad18e9b..14d7de37b0 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
@@ -92,17 +92,17 @@ public class SubscribeTestsWithSecurity extends
MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testSubscriptionQueueRemoved() throws Exception {
- final String CLIENT_ID = "consumer";
+ final String CONSUMER_ID = "consumer";
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.username(noDeleteUser)
.password(noDeletePass.getBytes(StandardCharsets.UTF_8))
.build();
- MqttClient client = createPahoClient(CLIENT_ID);
+ MqttClient client = createPahoClient(CONSUMER_ID);
client.connect(options);
client.subscribe(getTopicName(), 0).waitForCompletion();
client.disconnect();
- Wait.assertTrue(() -> getSubscriptionQueue(getTopicName()) == null,
2000, 100);
+ Wait.assertTrue(() -> getSubscriptionQueue(getTopicName(), CONSUMER_ID)
== null, 2000, 100);
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
index d1514b0076..2f06512d2d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
@@ -92,11 +92,12 @@ public class CertificateAuthenticationSslTests extends
MQTT5TestSupport {
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testSimpleSendReceive() throws Exception {
- String topic = RandomUtil.randomString();
+ final String topic = RandomUtil.randomString();
+ final String clientId = "subscriber";
byte[] body = RandomUtil.randomBytes(32);
CountDownLatch latch = new CountDownLatch(1);
- MqttClient subscriber = createPahoClient(protocol,"subscriber");
+ MqttClient subscriber = createPahoClient(protocol, clientId);
subscriber.connect(getSslMqttConnectOptions());
subscriber.setCallback(new DefaultMqttCallback() {
@Override
@@ -107,8 +108,8 @@ public class CertificateAuthenticationSslTests extends
MQTT5TestSupport {
});
subscriber.subscribe(topic, AT_LEAST_ONCE);
- Wait.assertTrue(() -> getSubscriptionQueue(topic) != null, 2000, 100);
- Wait.assertEquals(1, () ->
getSubscriptionQueue(topic).getConsumerCount(), 2000, 100);
+ Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null,
2000, 100);
+ Wait.assertEquals(1, () -> getSubscriptionQueue(topic,
clientId).getConsumerCount(), 2000, 100);
MqttClient producer = createPahoClient(protocol,"producer");
producer.connect(getSslMqttConnectOptions());