Repository: activemq Updated Branches: refs/heads/master 8ef44452a -> 5d6d42ce9
https://issues.apache.org/jira/browse/AMQ-5883 https://issues.apache.org/jira/browse/AMQ-5884 https://issues.apache.org/jira/browse/AMQ-5885 Add additional validation of Topic names used in subscribe and unsubscriobe that test for spec compliance. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5d6d42ce Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5d6d42ce Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5d6d42ce Branch: refs/heads/master Commit: 5d6d42ce97a8a6cacd630574e54b330275d041a7 Parents: 8ef4445 Author: Timothy Bish <[email protected]> Authored: Wed Mar 2 12:30:54 2016 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Mar 2 12:30:54 2016 -0500 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 12 +- .../transport/mqtt/MQTTProtocolSupport.java | 87 ++++++++ .../activemq/transport/mqtt/MQTTTest.java | 202 +++++++++++++++++++ 3 files changed, 298 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 97a74a9..154ec53 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -367,6 +367,7 @@ public class MQTTProtocolConverter { if (topics != null) { byte[] qos = new byte[topics.length]; for (int i = 0; i < topics.length; i++) { + MQTTProtocolSupport.validate(topics[i].name().toString()); try { qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]); } catch (IOException e) { @@ -383,6 +384,7 @@ public class MQTTProtocolConverter { } } else { LOG.warn("No topics defined for Subscription " + command); + throw new MQTTProtocolException("SUBSCRIBE command received with no topic filter"); } } @@ -394,16 +396,20 @@ public class MQTTProtocolConverter { UTF8Buffer[] topics = command.topics(); if (topics != null) { for (UTF8Buffer topic : topics) { + MQTTProtocolSupport.validate(topic.toString()); try { findSubscriptionStrategy().onUnSubscribe(topic.toString()); } catch (IOException e) { throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); } } + UNSUBACK ack = new UNSUBACK(); + ack.messageId(command.messageId()); + sendToMQTT(ack.encode()); + } else { + LOG.warn("No topics defined for Subscription " + command); + throw new MQTTProtocolException("UNSUBSCRIBE command received with no topic filter"); } - UNSUBACK ack = new UNSUBACK(); - ack.messageId(command.messageId()); - sendToMQTT(ack.encode()); } /** http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java index 90f8644..4a4d3c5 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolSupport.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.transport.mqtt; +import java.io.UnsupportedEncodingException; + import org.fusesource.mqtt.codec.CONNECT; import org.fusesource.mqtt.codec.DISCONNECT; import org.fusesource.mqtt.codec.PINGREQ; @@ -32,6 +34,16 @@ import org.fusesource.mqtt.codec.UNSUBSCRIBE; */ public class MQTTProtocolSupport { + private static final int TOPIC_NAME_MIN_LENGTH = 1; + private static final int TOPIC_NAME_MAX_LENGTH = 65535; + + private static final String MULTI_LEVEL_WILDCARD = "#"; + private static final String SINGLE_LEVEL_WILDCARD = "+"; + + private static final char MULTI_LEVEL_WILDCARD_CHAR = '#'; + private static final char SINGLE_LEVEL_WILDCARD_CHAR = '+'; + private static final char TOPIC_LEVEL_SEPERATOR_CHAR = '/'; + /** * Converts an MQTT formatted Topic name into a suitable ActiveMQ Destination * name string. @@ -142,4 +154,79 @@ public class MQTTProtocolSupport { return "UNKNOWN"; } } + + /** + * Validate that the Topic names given by client commands are valid + * based on the MQTT protocol specification. + * + * @param topicName + * the given Topic name provided by the client. + * + * @throws MQTTProtocolException if the value given is invalid. + */ + public static void validate(String topicName) throws MQTTProtocolException { + int topicLen = 0; + try { + topicLen = topicName.getBytes("UTF-8").length; + } catch (UnsupportedEncodingException e) { + throw new MQTTProtocolException("Topic name contained invalid UTF-8 encoding."); + } + + // Spec: Unless stated otherwise all UTF-8 encoded strings can have any length in + // the range 0 to 65535 bytes. + if (topicLen < TOPIC_NAME_MIN_LENGTH || topicLen > TOPIC_NAME_MAX_LENGTH) { + throw new MQTTProtocolException("Topic name given had invliad length."); + } + + // 4.7.1.2 and 4.7.1.3 these can stand alone + if (MULTI_LEVEL_WILDCARD.equals(topicName) || SINGLE_LEVEL_WILDCARD.equals(topicName)) { + return; + } + + // Spec: 4.7.1.2 + // The multi-level wildcard character MUST be specified either on its own or following a + // topic level separator. In either case it MUST be the last character specified in the + // Topic Filter [MQTT-4.7.1-2]. + int numWildCards = 0; + for (int i = 0; i < topicName.length(); ++i) { + if (topicName.charAt(i) == MULTI_LEVEL_WILDCARD_CHAR) { + numWildCards++; + + // If prev exists it must be a separator + if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) { + throw new MQTTProtocolException("The multi level wildcard must stand alone: " + topicName); + } + } + + if (numWildCards > 1) { + throw new MQTTProtocolException("Topic Filter can only have one multi-level filter: " + topicName); + } + } + + if (topicName.contains(MULTI_LEVEL_WILDCARD) && !topicName.endsWith(MULTI_LEVEL_WILDCARD)) { + throw new MQTTProtocolException("The multi-level filter must be at the end of the Topic name: " + topicName); + } + + // Spec: 4.7.1.3 + // The single-level wildcard can be used at any level in the Topic Filter, including + // first and last levels. Where it is used it MUST occupy an entire level of the filter + // + // [MQTT-4.7.1-3]. It can be used at more than one level in the Topic Filter and can be + // used in conjunction with the multilevel wildcard. + for (int i = 0; i < topicName.length(); ++i) { + if (topicName.charAt(i) != SINGLE_LEVEL_WILDCARD_CHAR) { + continue; + } + + // If prev exists it must be a separator + if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) { + throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName); + } + + // If next exists it must be a separator + if (i < topicName.length() - 1 && topicName.charAt(i + 1) != TOPIC_LEVEL_SEPERATOR_CHAR) { + throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName); + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/5d6d42ce/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index c89c15e..fc401e9 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -1370,6 +1370,208 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 30 * 10000) + public void testSubscribeWithZeroLengthTopic() throws Exception { + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + Topic topic = new Topic("", QoS.EXACTLY_ONCE); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + LOG.info("Trying to subscrobe to topic: {}", topic.name()); + + try { + connection.subscribe(new Topic[] { topic }); + fail("Should not be able to subscribe with invalid Topic"); + } catch (Exception ex) { + LOG.info("Caught expected error on subscribe"); + } + + assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !connection.isConnected(); + } + })); + } + + @Test(timeout = 30 * 10000) + public void testUnsubscribeWithZeroLengthTopic() throws Exception { + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + Topic topic = new Topic("", QoS.EXACTLY_ONCE); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + LOG.info("Trying to subscrobe to topic: {}", topic.name()); + + try { + connection.unsubscribe(new String[] { topic.name().toString() }); + fail("Should not be able to subscribe with invalid Topic"); + } catch (Exception ex) { + LOG.info("Caught expected error on subscribe"); + } + + assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !connection.isConnected(); + } + })); + } + + @Test(timeout = 30 * 10000) + public void testSubscribeWithInvalidMultiLevelWildcards() throws Exception { + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + Topic[] topics = { new Topic("#/Foo", QoS.EXACTLY_ONCE), + new Topic("#/Foo/#", QoS.EXACTLY_ONCE), + new Topic("Foo/#/Level", QoS.EXACTLY_ONCE), + new Topic("Foo/X#", QoS.EXACTLY_ONCE) }; + + for (int i = 0; i < topics.length; ++i) { + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + LOG.info("Trying to subscrobe to topic: {}", topics[i].name()); + + try { + connection.subscribe(new Topic[] { topics[i] }); + fail("Should not be able to subscribe with invalid Topic"); + } catch (Exception ex) { + LOG.info("Caught expected error on subscribe"); + } + + assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !connection.isConnected(); + } + })); + } + } + + @Test(timeout = 30 * 10000) + public void testSubscribeWithInvalidSingleLevelWildcards() throws Exception { + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + Topic[] topics = { new Topic("Foo+", QoS.EXACTLY_ONCE), + new Topic("+Foo/#", QoS.EXACTLY_ONCE), + new Topic("+#", QoS.EXACTLY_ONCE), + new Topic("Foo/+X/Level", QoS.EXACTLY_ONCE), + new Topic("Foo/+F", QoS.EXACTLY_ONCE) }; + + for (int i = 0; i < topics.length; ++i) { + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + LOG.info("Trying to subscrobe to topic: {}", topics[i].name()); + + try { + connection.subscribe(new Topic[] { topics[i] }); + fail("Should not be able to subscribe with invalid Topic"); + } catch (Exception ex) { + LOG.info("Caught expected error on subscribe"); + } + + assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !connection.isConnected(); + } + })); + } + } + + @Test(timeout = 30 * 10000) + public void testUnsubscribeWithInvalidMultiLevelWildcards() throws Exception { + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + Topic[] topics = { new Topic("#/Foo", QoS.EXACTLY_ONCE), + new Topic("#/Foo/#", QoS.EXACTLY_ONCE), + new Topic("Foo/#/Level", QoS.EXACTLY_ONCE), + new Topic("Foo/X#", QoS.EXACTLY_ONCE) }; + + for (int i = 0; i < topics.length; ++i) { + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + LOG.info("Trying to subscrobe to topic: {}", topics[i].name()); + + try { + connection.unsubscribe(new String[] { topics[i].name().toString() }); + fail("Should not be able to unsubscribe with invalid Topic"); + } catch (Exception ex) { + LOG.info("Caught expected error on subscribe"); + } + + assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !connection.isConnected(); + } + })); + } + } + + @Test(timeout = 30 * 10000) + public void testUnsubscribeWithInvalidSingleLevelWildcards() throws Exception { + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + Topic[] topics = { new Topic("Foo+", QoS.EXACTLY_ONCE), + new Topic("+Foo/#", QoS.EXACTLY_ONCE), + new Topic("+#", QoS.EXACTLY_ONCE), + new Topic("Foo/+X/Level", QoS.EXACTLY_ONCE), + new Topic("Foo/+F", QoS.EXACTLY_ONCE) }; + + for (int i = 0; i < topics.length; ++i) { + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + LOG.info("Trying to subscrobe to topic: {}", topics[i].name()); + + try { + connection.unsubscribe(new String[] { topics[i].name().toString() }); + fail("Should not be able to unsubscribe with invalid Topic"); + } catch (Exception ex) { + LOG.info("Caught expected error on subscribe"); + } + + assertTrue("Should have lost connection", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !connection.isConnected(); + } + })); + } + } + + @Test(timeout = 30 * 10000) public void testSubscribeMultipleTopics() throws Exception { byte[] payload = new byte[1024 * 32];
