Repository: activemq-artemis Updated Branches: refs/heads/master d0219bea1 -> 53ace34b4
ARTEMIS-826 Fix MQTT protocol detection Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1c84bd39 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1c84bd39 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1c84bd39 Branch: refs/heads/master Commit: 1c84bd39c45a8f3cf91d7d60d212d693905c9ad8 Parents: d0219be Author: Martyn Taylor <[email protected]> Authored: Mon Apr 24 16:27:46 2017 +0100 Committer: Martyn Taylor <[email protected]> Committed: Tue Apr 25 13:53:21 2017 +0100 ---------------------------------------------------------------------- .../core/protocol/mqtt/MQTTProtocolManager.java | 48 +++++++++++++++----- .../artemis/core/protocol/ProtocolHandler.java | 3 +- .../integration/mqtt/imported/MQTTTest.java | 21 +++++++++ 3 files changed, 60 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c84bd39/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index f4cba64..6118b0d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; @@ -115,19 +117,43 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter pipeline.addLast(new MQTTProtocolHandler(server, this)); } + /** + * The protocol handler passes us an 8 byte long array from the transport. We sniff these first 8 bytes to see + * if they match the first 8 bytes from MQTT Connect packet. In many other protocols the protocol name is the first + * thing sent on the wire. However, in MQTT the protocol name doesn't come until later on in the CONNECT packet. + * + * In order to fully identify MQTT protocol via protocol name, we need up to 12 bytes. However, we can use other + * information from the connect packet to infer that the MQTT protocol is being used. This is enough to identify MQTT + * and add the Netty codec in the pipeline. The Netty codec takes care of things from here. + * + * MQTT CONNECT PACKET: See MQTT 3.1.1 Spec for more info. + * + * Byte 1: Fixed Header Packet Type. 0b0001000 (16) = MQTT Connect + * Byte 2-[N]: Remaining length of the Connect Packet (encoded with 1-4 bytes). + * + * The next set of bytes represents the UTF8 encoded string MQTT (MQTT 3.1.1) or MQIsdp (MQTT 3.1) + * Byte N: UTF8 MSB must be 0 + * Byte N+1: UTF8 LSB must be (4(MQTT) or 6(MQIsdp)) + * Byte N+1: M (first char from the protocol name). + * + * Max no bytes used in the sequence = 8. + */ @Override public boolean isProtocol(byte[] array) { - boolean mqtt311 = array[4] == 77 && // M - array[5] == 81 && // Q - array[6] == 84 && // T - array[7] == 84; // T - - // FIXME The actual protocol name is 'MQIsdp' (However we are only passed the first 4 bytes of the protocol name) - boolean mqtt31 = array[4] == 77 && // M - array[5] == 81 && // Q - array[6] == 73 && // I - array[7] == 115; // s - return mqtt311 || mqtt31; + ByteBuf buf = Unpooled.wrappedBuffer(array); + + if (!(buf.readByte() == 16 && validateRemainingLength(buf) && buf.readByte() == (byte) 0)) return false; + byte b = buf.readByte(); + return ((b == 4 || b == 6) && (buf.readByte() == 77)); + } + + private boolean validateRemainingLength(ByteBuf buffer) { + byte msb = (byte) 0b10000000; + for (byte i = 0; i < 4; i++) { + if ((buffer.readByte() & msb) != msb) + return true; + } + return false; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c84bd39/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index ca78f29..327d870 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -135,7 +135,7 @@ public class ProtocolHandler { return; } - // Will use the first five bytes to detect a protocol. + // Will use the first N bytes to detect a protocol depending on the protocol. if (in.readableBytes() < 8) { return; } @@ -175,6 +175,7 @@ public class ProtocolHandler { protocolToUse = ActiveMQClient.DEFAULT_CORE_PROTOCOL; } } + ProtocolManager protocolManagerToUse = protocolMap.get(protocolToUse); ConnectionCreator channelHandler = nettyAcceptor.createConnectionCreator(); ChannelPipeline pipeline = ctx.pipeline(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1c84bd39/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 6b58fa2..28b7984 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -83,6 +83,27 @@ public class MQTTTest extends MQTTTestSupport { } + @Test + public void testConnectWithLargePassword() throws Exception { + for (String version : Arrays.asList("3.1", "3.1.1")) { + String longString = new String(new char[65535]); + + BlockingConnection connection = null; + try { + MQTT mqtt = createMQTTConnection("test-" + version, true); + mqtt.setUserName(longString); + mqtt.setPassword(longString); + mqtt.setConnectAttemptsMax(1); + mqtt.setVersion(version); + connection = mqtt.blockingConnection(); + connection.connect(); + assertTrue(connection.isConnected()); + } finally { + if (connection != null && connection.isConnected()) connection.disconnect(); + } + } + } + @Test(timeout = 60 * 1000) public void testSendAndReceiveMQTT() throws Exception { final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
