gemmellr commented on code in PR #4710:
URL: https://github.com/apache/activemq-artemis/pull/4710#discussion_r1469405079


##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java:
##########
@@ -138,25 +138,71 @@ public class MQTTUtil {
 
    public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
 
-   public static String convertMqttTopicFilterToCore(String filter, 
WildcardConfiguration wildcardConfiguration) {
-      return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
-   }
+   public static final WildcardConfiguration MQTT_WILDCARD = new 
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
 
-   public static String convertMqttTopicFilterToCore(String prefixToAdd, 
String filter, WildcardConfiguration wildcardConfiguration) {
-      if (filter == null) {
-         return "";
+   /**
+    * This method takes the MQTT-related input and translates it into the 
proper name for a core subscription queue. The
+    * {@code topicFilter} may be either for a shared subscription in the 
format {@code $share/<shareName>/<topicFilter>}
+    * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code 
a/b/c}, etc.).
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param clientId the MQTT client ID, used for normal (i.e. non-shared) 
subscriptions
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core subscription queue based on the input
+    */
+   public static String getCoreQueueFromMqttTopic(String topicFilter, String 
clientId, WildcardConfiguration wildcardConfiguration) {
+      if (topicFilter == null || clientId == null || wildcardConfiguration == 
null) {
+         throw new IllegalArgumentException();
       }
 
-      String converted = MQTT_WILDCARD.convert(filter, wildcardConfiguration);
-      if (prefixToAdd != null) {
-         converted = prefixToAdd + converted;
+      if (isSharedSubscription(topicFilter)) {
+         Pair<String, String> decomposed = 
decomposeSharedSubscriptionTopicFilter(topicFilter);
+         return new 
StringBuilder().append(decomposed.getA()).append(".").append(getCoreAddressFromMqttTopic(decomposed.getB(),
 wildcardConfiguration)).toString();
+      } else {
+         return new 
StringBuilder().append(clientId).append(".").append(getCoreAddressFromMqttTopic(topicFilter,
 wildcardConfiguration)).toString();
       }
-      return converted;
    }
 
-   public static String convertCoreAddressToMqttTopicFilter(String address, 
WildcardConfiguration wildcardConfiguration) {
-      if (address == null) {
-         return "";
+   /**
+    * This method takes the MQTT-related input and translates it into the 
proper name for a core address. The
+    * {@code topicFilter} must be normal (i.e. non-shared). It should not be 
in the format
+    * {@code $share/<shareName>/<topicFilter>}.
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core addres based on the input
+    */
+   public static String getCoreAddressFromMqttTopic(String topicFilter, 
WildcardConfiguration wildcardConfiguration) {
+      if (topicFilter == null || wildcardConfiguration == null) {
+         throw new IllegalArgumentException();
+      }

Review Comment:
   Same as above comment.



##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java:
##########
@@ -138,25 +138,71 @@ public class MQTTUtil {
 
    public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
 
-   public static String convertMqttTopicFilterToCore(String filter, 
WildcardConfiguration wildcardConfiguration) {
-      return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
-   }
+   public static final WildcardConfiguration MQTT_WILDCARD = new 
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
 
-   public static String convertMqttTopicFilterToCore(String prefixToAdd, 
String filter, WildcardConfiguration wildcardConfiguration) {
-      if (filter == null) {
-         return "";
+   /**
+    * This method takes the MQTT-related input and translates it into the 
proper name for a core subscription queue. The
+    * {@code topicFilter} may be either for a shared subscription in the 
format {@code $share/<shareName>/<topicFilter>}
+    * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code 
a/b/c}, etc.).
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param clientId the MQTT client ID, used for normal (i.e. non-shared) 
subscriptions
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core subscription queue based on the input
+    */
+   public static String getCoreQueueFromMqttTopic(String topicFilter, String 
clientId, WildcardConfiguration wildcardConfiguration) {
+      if (topicFilter == null || clientId == null || wildcardConfiguration == 
null) {
+         throw new IllegalArgumentException();

Review Comment:
   It might be nicer to be clearer on the specific problem so its more easily 
addressed if it does happen (which it seems like it has, some tests are 
failing). E.g switching to Objects.requireNonNull(T obj, String message) would 
still take the same number of lines, but could make crystal clear which one has 
caused the problem.



##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java:
##########
@@ -138,25 +138,71 @@ public class MQTTUtil {
 
    public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
 
-   public static String convertMqttTopicFilterToCore(String filter, 
WildcardConfiguration wildcardConfiguration) {
-      return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
-   }
+   public static final WildcardConfiguration MQTT_WILDCARD = new 
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
 
-   public static String convertMqttTopicFilterToCore(String prefixToAdd, 
String filter, WildcardConfiguration wildcardConfiguration) {
-      if (filter == null) {
-         return "";
+   /**
+    * This method takes the MQTT-related input and translates it into the 
proper name for a core subscription queue. The
+    * {@code topicFilter} may be either for a shared subscription in the 
format {@code $share/<shareName>/<topicFilter>}
+    * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code 
a/b/c}, etc.).
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param clientId the MQTT client ID, used for normal (i.e. non-shared) 
subscriptions
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core subscription queue based on the input
+    */
+   public static String getCoreQueueFromMqttTopic(String topicFilter, String 
clientId, WildcardConfiguration wildcardConfiguration) {
+      if (topicFilter == null || clientId == null || wildcardConfiguration == 
null) {
+         throw new IllegalArgumentException();
       }
 
-      String converted = MQTT_WILDCARD.convert(filter, wildcardConfiguration);
-      if (prefixToAdd != null) {
-         converted = prefixToAdd + converted;
+      if (isSharedSubscription(topicFilter)) {
+         Pair<String, String> decomposed = 
decomposeSharedSubscriptionTopicFilter(topicFilter);
+         return new 
StringBuilder().append(decomposed.getA()).append(".").append(getCoreAddressFromMqttTopic(decomposed.getB(),
 wildcardConfiguration)).toString();
+      } else {
+         return new 
StringBuilder().append(clientId).append(".").append(getCoreAddressFromMqttTopic(topicFilter,
 wildcardConfiguration)).toString();
       }
-      return converted;
    }
 
-   public static String convertCoreAddressToMqttTopicFilter(String address, 
WildcardConfiguration wildcardConfiguration) {
-      if (address == null) {
-         return "";
+   /**
+    * This method takes the MQTT-related input and translates it into the 
proper name for a core address. The
+    * {@code topicFilter} must be normal (i.e. non-shared). It should not be 
in the format
+    * {@code $share/<shareName>/<topicFilter>}.
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core addres based on the input
+    */
+   public static String getCoreAddressFromMqttTopic(String topicFilter, 
WildcardConfiguration wildcardConfiguration) {
+      if (topicFilter == null || wildcardConfiguration == null) {
+         throw new IllegalArgumentException();
+      }
+
+      return MQTT_WILDCARD.convert(topicFilter, wildcardConfiguration);
+   }
+
+   /**
+    * This is exactly the same as {@link #getCoreAddressFromMqttTopic(String, 
WildcardConfiguration)} except that it
+    * also prefixes the return with
+    * {@link 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core address based on the input, stripping
+    *         {@link 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
 if it exists
+    */
+   public static String getCoreRetainAddressFromMqttTopic(String topicFilter, 
WildcardConfiguration wildcardConfiguration) {
+      return MQTT_RETAIN_ADDRESS_PREFIX + 
getCoreAddressFromMqttTopic(topicFilter, wildcardConfiguration);
+   }
+
+   /**
+    *
+    * @param address the core address
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the MQTT topic based on the input
+    */
+   public static String getMqttTopicFromCoreAddress(String address, 
WildcardConfiguration wildcardConfiguration) {
+      if (address == null || wildcardConfiguration == null) {
+         throw new IllegalArgumentException();

Review Comment:
   Same as earlier comment.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java:
##########
@@ -83,6 +83,29 @@ public void messageArrived(String topic, MqttMessage 
message) {
       assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
    }
 
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testTopicNameEscape() throws Exception {
+      final String topic = "foo1.0/bar/baz";
+
+      CountDownLatch latch = new CountDownLatch(1);
+      MqttClient subscriber = createPahoClient("subscriber");
+      subscriber.connect();
+      subscriber.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String t, MqttMessage message) {
+            logger.info("Message received from topic {}, message={}", topic, 
message);
+            assertEquals(topic, t);

Review Comment:
   Would be better to remember the value here and then assert outside the 
callback after the latch trips. That way the test will fail quicker and in a 
more helpful manner if the value isnt as expected, having the assertion 
directly cause it rather than e.g just timing out after 500ms on the latch 
(because it wasnt tripped) and requiring someone go look at logs and hope that 
there is a stack logged that might explain it. It would also just be clearer 
what is really being tested, took me a bit to even notice what the test really 
cared about while doing the send+receive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to