gemmellr commented on code in PR #4710:
URL: https://github.com/apache/activemq-artemis/pull/4710#discussion_r1469405079
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java:
##########
@@ -138,25 +138,71 @@ public class MQTTUtil {
public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
- public static String convertMqttTopicFilterToCore(String filter,
WildcardConfiguration wildcardConfiguration) {
- return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
- }
+ public static final WildcardConfiguration MQTT_WILDCARD = new
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
- public static String convertMqttTopicFilterToCore(String prefixToAdd,
String filter, WildcardConfiguration wildcardConfiguration) {
- if (filter == null) {
- return "";
+ /**
+ * This method takes the MQTT-related input and translates it into the
proper name for a core subscription queue. The
+ * {@code topicFilter} may be either for a shared subscription in the
format {@code $share/<shareName>/<topicFilter>}
+ * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code
a/b/c}, etc.).
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param clientId the MQTT client ID, used for normal (i.e. non-shared)
subscriptions
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core subscription queue based on the input
+ */
+ public static String getCoreQueueFromMqttTopic(String topicFilter, String
clientId, WildcardConfiguration wildcardConfiguration) {
+ if (topicFilter == null || clientId == null || wildcardConfiguration ==
null) {
+ throw new IllegalArgumentException();
}
- String converted = MQTT_WILDCARD.convert(filter, wildcardConfiguration);
- if (prefixToAdd != null) {
- converted = prefixToAdd + converted;
+ if (isSharedSubscription(topicFilter)) {
+ Pair<String, String> decomposed =
decomposeSharedSubscriptionTopicFilter(topicFilter);
+ return new
StringBuilder().append(decomposed.getA()).append(".").append(getCoreAddressFromMqttTopic(decomposed.getB(),
wildcardConfiguration)).toString();
+ } else {
+ return new
StringBuilder().append(clientId).append(".").append(getCoreAddressFromMqttTopic(topicFilter,
wildcardConfiguration)).toString();
}
- return converted;
}
- public static String convertCoreAddressToMqttTopicFilter(String address,
WildcardConfiguration wildcardConfiguration) {
- if (address == null) {
- return "";
+ /**
+ * This method takes the MQTT-related input and translates it into the
proper name for a core address. The
+ * {@code topicFilter} must be normal (i.e. non-shared). It should not be
in the format
+ * {@code $share/<shareName>/<topicFilter>}.
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core addres based on the input
+ */
+ public static String getCoreAddressFromMqttTopic(String topicFilter,
WildcardConfiguration wildcardConfiguration) {
+ if (topicFilter == null || wildcardConfiguration == null) {
+ throw new IllegalArgumentException();
+ }
Review Comment:
Same as above comment.
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java:
##########
@@ -138,25 +138,71 @@ public class MQTTUtil {
public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
- public static String convertMqttTopicFilterToCore(String filter,
WildcardConfiguration wildcardConfiguration) {
- return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
- }
+ public static final WildcardConfiguration MQTT_WILDCARD = new
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
- public static String convertMqttTopicFilterToCore(String prefixToAdd,
String filter, WildcardConfiguration wildcardConfiguration) {
- if (filter == null) {
- return "";
+ /**
+ * This method takes the MQTT-related input and translates it into the
proper name for a core subscription queue. The
+ * {@code topicFilter} may be either for a shared subscription in the
format {@code $share/<shareName>/<topicFilter>}
+ * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code
a/b/c}, etc.).
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param clientId the MQTT client ID, used for normal (i.e. non-shared)
subscriptions
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core subscription queue based on the input
+ */
+ public static String getCoreQueueFromMqttTopic(String topicFilter, String
clientId, WildcardConfiguration wildcardConfiguration) {
+ if (topicFilter == null || clientId == null || wildcardConfiguration ==
null) {
+ throw new IllegalArgumentException();
Review Comment:
It might be nicer to be clearer on the specific problem so its more easily
addressed if it does happen (which it seems like it has, some tests are
failing). E.g switching to Objects.requireNonNull(T obj, String message) would
still take the same number of lines, but could make crystal clear which one has
caused the problem.
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java:
##########
@@ -138,25 +138,71 @@ public class MQTTUtil {
public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
- public static String convertMqttTopicFilterToCore(String filter,
WildcardConfiguration wildcardConfiguration) {
- return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
- }
+ public static final WildcardConfiguration MQTT_WILDCARD = new
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
- public static String convertMqttTopicFilterToCore(String prefixToAdd,
String filter, WildcardConfiguration wildcardConfiguration) {
- if (filter == null) {
- return "";
+ /**
+ * This method takes the MQTT-related input and translates it into the
proper name for a core subscription queue. The
+ * {@code topicFilter} may be either for a shared subscription in the
format {@code $share/<shareName>/<topicFilter>}
+ * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code
a/b/c}, etc.).
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param clientId the MQTT client ID, used for normal (i.e. non-shared)
subscriptions
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core subscription queue based on the input
+ */
+ public static String getCoreQueueFromMqttTopic(String topicFilter, String
clientId, WildcardConfiguration wildcardConfiguration) {
+ if (topicFilter == null || clientId == null || wildcardConfiguration ==
null) {
+ throw new IllegalArgumentException();
}
- String converted = MQTT_WILDCARD.convert(filter, wildcardConfiguration);
- if (prefixToAdd != null) {
- converted = prefixToAdd + converted;
+ if (isSharedSubscription(topicFilter)) {
+ Pair<String, String> decomposed =
decomposeSharedSubscriptionTopicFilter(topicFilter);
+ return new
StringBuilder().append(decomposed.getA()).append(".").append(getCoreAddressFromMqttTopic(decomposed.getB(),
wildcardConfiguration)).toString();
+ } else {
+ return new
StringBuilder().append(clientId).append(".").append(getCoreAddressFromMqttTopic(topicFilter,
wildcardConfiguration)).toString();
}
- return converted;
}
- public static String convertCoreAddressToMqttTopicFilter(String address,
WildcardConfiguration wildcardConfiguration) {
- if (address == null) {
- return "";
+ /**
+ * This method takes the MQTT-related input and translates it into the
proper name for a core address. The
+ * {@code topicFilter} must be normal (i.e. non-shared). It should not be
in the format
+ * {@code $share/<shareName>/<topicFilter>}.
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core addres based on the input
+ */
+ public static String getCoreAddressFromMqttTopic(String topicFilter,
WildcardConfiguration wildcardConfiguration) {
+ if (topicFilter == null || wildcardConfiguration == null) {
+ throw new IllegalArgumentException();
+ }
+
+ return MQTT_WILDCARD.convert(topicFilter, wildcardConfiguration);
+ }
+
+ /**
+ * This is exactly the same as {@link #getCoreAddressFromMqttTopic(String,
WildcardConfiguration)} except that it
+ * also prefixes the return with
+ * {@link
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
+ *
+ * @param topicFilter the MQTT topic filter
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the core address based on the input, stripping
+ * {@link
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
if it exists
+ */
+ public static String getCoreRetainAddressFromMqttTopic(String topicFilter,
WildcardConfiguration wildcardConfiguration) {
+ return MQTT_RETAIN_ADDRESS_PREFIX +
getCoreAddressFromMqttTopic(topicFilter, wildcardConfiguration);
+ }
+
+ /**
+ *
+ * @param address the core address
+ * @param wildcardConfiguration the {@code WildcardConfiguration} governing
the core broker
+ * @return the name of the MQTT topic based on the input
+ */
+ public static String getMqttTopicFromCoreAddress(String address,
WildcardConfiguration wildcardConfiguration) {
+ if (address == null || wildcardConfiguration == null) {
+ throw new IllegalArgumentException();
Review Comment:
Same as earlier comment.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java:
##########
@@ -83,6 +83,29 @@ public void messageArrived(String topic, MqttMessage
message) {
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testTopicNameEscape() throws Exception {
+ final String topic = "foo1.0/bar/baz";
+
+ CountDownLatch latch = new CountDownLatch(1);
+ MqttClient subscriber = createPahoClient("subscriber");
+ subscriber.connect();
+ subscriber.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String t, MqttMessage message) {
+ logger.info("Message received from topic {}, message={}", topic,
message);
+ assertEquals(topic, t);
Review Comment:
Would be better to remember the value here and then assert outside the
callback after the latch trips. That way the test will fail quicker and in a
more helpful manner if the value isnt as expected, having the assertion
directly cause it rather than e.g just timing out after 500ms on the latch
(because it wasnt tripped) and requiring someone go look at logs and hope that
there is a stack logged that might explain it. It would also just be clearer
what is really being tested, took me a bit to even notice what the test really
cared about while doing the send+receive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]