Repository: activemq Updated Branches: refs/heads/master 012e4d0a1 -> 1b38b27ed
https://issues.apache.org/jira/browse/AMQ-5882 Fail to publish if the topic name in the publish packet contains a wild card character as per the MQTT V3.1 and V3.1.1 spec. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1b38b27e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1b38b27e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1b38b27e Branch: refs/heads/master Commit: 1b38b27ed23a5ac604e5a4dd509dd854afdfde07 Parents: 012e4d0 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Sun Feb 28 17:54:23 2016 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Sun Feb 28 17:54:23 2016 +0000 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 14 +++++++++ .../activemq/transport/mqtt/MQTTTest.java | 31 ++++++++++++++++++++ 2 files changed, 45 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/1b38b27e/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 8e83ed2..97a74a9 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -91,6 +91,9 @@ public class MQTTProtocolConverter { public static final int V3_1 = 3; public static final int V3_1_1 = 4; + public static final String SINGLE_LEVEL_WILDCARD = "+"; + public static final String MULTI_LEVEL_WILDCARD = "#"; + private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; @@ -458,6 +461,12 @@ public class MQTTProtocolConverter { checkConnected(); LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", command.messageId(), clientId, connectionInfo.getConnectionId()); + //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet + if (containsMqttWildcard(command.topicName().toString())) { + // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters + getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null)); + return; + } ActiveMQMessage message = convertMessage(command); message.setProducerId(producerId); message.onSend(); @@ -820,6 +829,11 @@ public class MQTTProtocolConverter { return clientId; } + protected boolean containsMqttWildcard(String value) { + return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) || + value.contains(MULTI_LEVEL_WILDCARD)); + } + protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { if (subsciptionStrategy == null) { synchronized (STRATAGY_FINDER) { http://git-wip-us.apache.org/repos/asf/activemq/blob/1b38b27e/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3dd3348..a6e72e7 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -1164,6 +1164,37 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 60 * 1000) + public void testPublishWildcard31() throws Exception { + testPublishWildcard("3.1"); + } + + @Test(timeout = 60 * 1000) + public void testPublishWildcard311() throws Exception { + testPublishWildcard("3.1.1"); + } + + private void testPublishWildcard(String version) throws Exception { + MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true); + mqttPub.setVersion(version); + BlockingConnection blockingConnection = mqttPub.blockingConnection(); + blockingConnection.connect(); + String payload = "Test Message"; + try { + blockingConnection.publish("foo/#", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + fail("Should not be able to publish with wildcard in topic."); + } catch (Exception ex) { + LOG.info("Exception expected on publish with wildcard in topic name"); + } + try { + blockingConnection.publish("foo/+", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + fail("Should not be able to publish with wildcard in topic."); + } catch (Exception ex) { + LOG.info("Exception expected on publish with wildcard in topic name"); + } + blockingConnection.disconnect(); + } + + @Test(timeout = 60 * 1000) public void testDuplicateClientId() throws Exception { // test link stealing enabled by default final String clientId = "duplicateClient";
