Repository: activemq Updated Branches: refs/heads/master c02bc6484 -> 6dacef1c9
https://issues.apache.org/jira/browse/AMQ-6345 The MQTT transport will now throw an exception if a PINGREQ is sent to the broker if a CONNECT packet has not been received first as the spec says CONNECT must be the first packet sent. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6dacef1c Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6dacef1c Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6dacef1c Branch: refs/heads/master Commit: 6dacef1c9552edbad656c31d784179c2cd179b2e Parents: c02bc64 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Wed Jul 6 15:59:25 2016 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Wed Jul 6 15:59:25 2016 +0000 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 1 + .../activemq/transport/mqtt/MQTTTest.java | 122 +++++++++++++++++++ 2 files changed, 123 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6dacef1c/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index e693327..23ca5fa 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -196,6 +196,7 @@ public class MQTTProtocolConverter { switch (frame.messageType()) { case PINGREQ.TYPE: LOG.debug("Received a ping from client: " + getClientId()); + checkConnected(); sendToMQTT(PING_RESP_FRAME); LOG.debug("Sent Ping Response to " + getClientId()); break; http://git-wip-us.apache.org/repos/asf/activemq/blob/6dacef1c/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 5e28b2a..227ade6 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -25,6 +25,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.EOFException; +import java.lang.reflect.Method; import java.net.ProtocolException; import java.util.ArrayList; import java.util.Arrays; @@ -56,13 +58,21 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.fusesource.hawtdispatch.transport.Transport; import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.Callback; +import org.fusesource.mqtt.client.CallbackConnection; +import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.MQTTProtocolCodec; +import org.fusesource.mqtt.codec.PINGREQ; +import org.fusesource.mqtt.codec.PINGRESP; import org.fusesource.mqtt.codec.PUBLISH; import org.junit.Test; import org.slf4j.Logger; @@ -1961,4 +1971,116 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); } + + @Test(timeout = 15 * 1000, expected=EOFException.class) + public void testPingReqWithoutConnectFail31() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("clientId"); + mqtt.setVersion("3.1"); + testPingReqWithoutConnectFail(mqtt); + } + + @Test(timeout = 15 * 1000, expected=EOFException.class) + public void testPingReqWithoutConnectFail311() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("clientId"); + mqtt.setVersion("3.1.1"); + testPingReqWithoutConnectFail(mqtt); + } + + @Test(timeout = 15 * 1000) + public void testPingReqConnectSuccess31() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("clientId"); + mqtt.setVersion("3.1"); + testPingReqConnectSuccess(mqtt); + } + + @Test(timeout = 15 * 1000) + public void testPingReqConnectSuccess311() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("clientId"); + mqtt.setVersion("3.1.1"); + testPingReqConnectSuccess(mqtt); + } + + private void testPingReqWithoutConnectFail(final MQTT mqtt) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Transport> transport = new AtomicReference<>(); + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Callback<Transport> con = new Callback<Transport>() { + + @Override + public void onSuccess(Transport value) { + transport.set(value); + latch.countDown(); + } + + @Override + public void onFailure(Throwable value) { + error.set(value); + latch.countDown(); + } + }; + + //Connect to the transport by using the createTransport method with a custom callback + //This will ensure that we connect without sending a CONNECT packet for testing + //and that we won't receive automatically + CallbackConnection connection = new CallbackConnection(mqtt); + Method createTransportMethod = connection.getClass().getDeclaredMethod("createTransport", Callback.class); + createTransportMethod.setAccessible(true); + createTransportMethod.invoke(connection, con); + latch.await(); + + //Make sure no error on connect + if (error.get() != null) { + LOG.error(error.get().getMessage(), error.get()); + fail(error.get().getMessage()); + } + + //Send a PINGREQ without a connect packet first + final MQTTProtocolCodec codec = new MQTTProtocolCodec(); + codec.setTransport(transport.get()); + transport.get().offer(new PINGREQ().encode()); + + //Protocol should throw an exception since we never sent a CONNECT + Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + //Wait for exception to be thrown + codec.read(); + return false; + } + }, 5000, 100); + + } + + private void testPingReqConnectSuccess(final MQTT mqtt) throws Exception { + final CountDownLatch pingRespReceived = new CountDownLatch(1); + //Tracer to assert we received the response by waiting for it + mqtt.setTracer(new Tracer() { + + @Override + public void onReceive(MQTTFrame frame) { + if (frame.messageType() == PINGRESP.TYPE) { + pingRespReceived.countDown(); + } + } + + }); + CallbackConnection callbackConnection = new CallbackConnection(mqtt); + BlockingConnection connection = new BlockingConnection(new FutureConnection(callbackConnection)); + connection.connect(); + Transport transport = callbackConnection.transport(); + + //SEND a PINGREQ and wait for the response + final MQTTProtocolCodec codec = new MQTTProtocolCodec(); + codec.setTransport(transport); + transport.offer(new PINGREQ().encode()); + + //Wait for the response + assertTrue(pingRespReceived.await(5, TimeUnit.SECONDS)); + } + }
