Repository: activemq Updated Branches: refs/heads/trunk f158e7da6 -> 145b64ac2
Fix test name so it gets included in the surefire test runs. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/145b64ac Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/145b64ac Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/145b64ac Branch: refs/heads/trunk Commit: 145b64ac2ae6a700ee4fec1909e9aef366ca1633 Parents: f158e7d Author: Timothy Bish <[email protected]> Authored: Fri Aug 8 17:41:50 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Aug 8 17:41:50 2014 -0400 ---------------------------------------------------------------------- .../activemq/transport/mqtt/MQTTAuthTest.java | 294 +++++++++++++++++++ .../activemq/transport/mqtt/MQTTAuthTests.java | 294 ------------------- 2 files changed, 294 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/145b64ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java new file mode 100644 index 0000000..a8ced02 --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.net.ProtocolException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.InvalidClientIDException; +import javax.security.auth.login.CredentialException; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ConnectionInfo; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.client.Tracer; +import org.fusesource.mqtt.codec.CONNACK; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests various use cases that require authentication or authorization over MQTT + */ +@RunWith(Parameterized.class) +public class MQTTAuthTest extends MQTTAuthTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTAuthTest.class); + + @Parameters(name="{0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"mqtt", false}, + {"mqtt+ssl", true}, + {"mqtt+nio", false}, + {"mqtt+nio+ssl", true} + }); + } + + public MQTTAuthTest(String connectorScheme, boolean useSSL) { + super(connectorScheme, useSSL); + } + + @Test(timeout = 60 * 1000) + public void testAnonymousUserConnect() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setCleanSession(true); + mqtt.setUserName((String)null); + mqtt.setPassword((String)null); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + LOG.info("Connected as anonymous client"); + connection.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception { + MQTT mqttPub = createMQTTConnection("pub", true); + mqttPub.setUserName("foo"); + mqttPub.setPassword("bar"); + + final AtomicBoolean failed = new AtomicBoolean(); + + mqttPub.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received: {}", frame); + if (frame.messageType() == CONNACK.TYPE) { + CONNACK connAck = new CONNACK(); + try { + connAck.decode(frame); + LOG.info("{}", connAck); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED, connAck.code()); + } catch (ProtocolException e) { + failed.set(true); + fail("Error decoding publish " + e.getMessage()); + } catch (Throwable err) { + failed.set(true); + throw err; + } + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent: {}", frame); + } + }); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + try { + connectionPub.connect(); + fail("Should not be able to connect."); + } catch (Exception e) { + } + + assertFalse("connection should have failed.", failed.get()); + } + + @Test(timeout = 60 * 1000) + public void testFailedSubscription() throws Exception { + final String ANONYMOUS = "anonymous"; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); + mqtt.setKeepAlive((short) 2); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + final String NAMED = "named"; + byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) }); + assertEquals((byte) 0x80, qos[0]); + assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]); + + // validate the subscription by sending a retained message + connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true); + Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(ANONYMOUS, new String(msg.getPayload())); + msg.ack(); + + connection.unsubscribe(new String[] { ANONYMOUS }); + qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) }); + assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]); + + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(ANONYMOUS, new String(msg.getPayload())); + msg.ack(); + + connection.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testWildcardRetainedSubscription() throws Exception { + MQTT mqttPub = createMQTTConnection("pub", true); + mqttPub.setUserName("admin"); + mqttPub.setPassword("admin"); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + connectionPub.connect(); + connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true); + + MQTT mqttSub = createMQTTConnection("sub", true); + mqttSub.setUserName("user"); + mqttSub.setPassword("password"); + BlockingConnection connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)}); + Message msg = connectionSub.receive(1, TimeUnit.SECONDS); + assertNull("Shouldn't receive the message", msg); + } + + @Test(timeout = 60 * 1000) + public void testInvalidClientIdGetCorrectErrorCode() throws Exception { + MQTT mqttPub = createMQTTConnection("invalid", true); + + final AtomicInteger errorCode = new AtomicInteger(); + + mqttPub.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received: {}", frame); + if (frame.messageType() == CONNACK.TYPE) { + CONNACK connAck = new CONNACK(); + try { + connAck.decode(frame); + LOG.info("{}", connAck); + errorCode.set(connAck.code().ordinal()); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED, connAck.code()); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent: {}", frame); + } + }); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + try { + connectionPub.connect(); + fail("Should not be able to connect."); + } catch (Exception e) { + } + + assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED.ordinal(), errorCode.get()); + } + + @Test(timeout = 60 * 1000) + public void testBadCredentialExceptionGetsCorrectErrorCode() throws Exception { + MQTT mqttPub = createMQTTConnection("bad-credential", true); + mqttPub.setUserName("admin"); + mqttPub.setPassword("admin"); + + final AtomicInteger errorCode = new AtomicInteger(); + + mqttPub.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received: {}", frame); + if (frame.messageType() == CONNACK.TYPE) { + CONNACK connAck = new CONNACK(); + try { + connAck.decode(frame); + LOG.info("{}", connAck); + errorCode.set(connAck.code().ordinal()); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code()); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent: {}", frame); + } + }); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + try { + connectionPub.connect(); + fail("Should not be able to connect."); + } catch (Exception e) { + } + + assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD.ordinal(), errorCode.get()); + } + + @Override + protected void createPlugins(List<BrokerPlugin> plugins) throws Exception { + BrokerPlugin failOnSpecificConditionsPlugin = new BrokerPlugin() { + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new BrokerFilter(broker) { + @Override + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { + String clientId = info.getClientId(); + if (clientId != null && !clientId.isEmpty()) { + if (clientId.equalsIgnoreCase("invalid")) { + LOG.info("Client ID was invalid"); + throw new InvalidClientIDException("Bad client Id"); + } else if (clientId.equalsIgnoreCase("bad-credential")) { + LOG.info("User Name was invalid"); + throw new CredentialException("Unknwon User Name."); + } + } + } + }; + } + }; + + plugins.add(failOnSpecificConditionsPlugin); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/145b64ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java deleted file mode 100644 index 574d554..0000000 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -import java.net.ProtocolException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.InvalidClientIDException; -import javax.security.auth.login.CredentialException; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerFilter; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ConnectionInfo; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.Message; -import org.fusesource.mqtt.client.QoS; -import org.fusesource.mqtt.client.Topic; -import org.fusesource.mqtt.client.Tracer; -import org.fusesource.mqtt.codec.CONNACK; -import org.fusesource.mqtt.codec.MQTTFrame; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tests various use cases that require authentication or authorization over MQTT - */ -@RunWith(Parameterized.class) -public class MQTTAuthTests extends MQTTAuthTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(MQTTAuthTests.class); - - @Parameters(name= "{index}: scheme({0})") - public static Collection<Object[]> data() { - return Arrays.asList(new Object[][] { - {"mqtt", false}, - {"mqtt+ssl", true}, - {"mqtt+nio", false}, - {"mqtt+nio+ssl", true} - }); - } - - public MQTTAuthTests(String connectorScheme, boolean useSSL) { - super(connectorScheme, useSSL); - } - - @Test(timeout = 60 * 1000) - public void testAnonymousUserConnect() throws Exception { - MQTT mqtt = createMQTTConnection(); - mqtt.setCleanSession(true); - mqtt.setUserName((String)null); - mqtt.setPassword((String)null); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - LOG.info("Connected as anonymous client"); - connection.disconnect(); - } - - @Test(timeout = 60 * 1000) - public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception { - MQTT mqttPub = createMQTTConnection("pub", true); - mqttPub.setUserName("foo"); - mqttPub.setPassword("bar"); - - final AtomicBoolean failed = new AtomicBoolean(); - - mqttPub.setTracer(new Tracer() { - @Override - public void onReceive(MQTTFrame frame) { - LOG.info("Client received: {}", frame); - if (frame.messageType() == CONNACK.TYPE) { - CONNACK connAck = new CONNACK(); - try { - connAck.decode(frame); - LOG.info("{}", connAck); - assertEquals(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED, connAck.code()); - } catch (ProtocolException e) { - failed.set(true); - fail("Error decoding publish " + e.getMessage()); - } catch (Throwable err) { - failed.set(true); - throw err; - } - } - } - - @Override - public void onSend(MQTTFrame frame) { - LOG.info("Client sent: {}", frame); - } - }); - - BlockingConnection connectionPub = mqttPub.blockingConnection(); - try { - connectionPub.connect(); - fail("Should not be able to connect."); - } catch (Exception e) { - } - - assertFalse("connection should have failed.", failed.get()); - } - - @Test(timeout = 60 * 1000) - public void testFailedSubscription() throws Exception { - final String ANONYMOUS = "anonymous"; - - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId("foo"); - mqtt.setKeepAlive((short) 2); - - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - - final String NAMED = "named"; - byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) }); - assertEquals((byte) 0x80, qos[0]); - assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]); - - // validate the subscription by sending a retained message - connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true); - Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - assertEquals(ANONYMOUS, new String(msg.getPayload())); - msg.ack(); - - connection.unsubscribe(new String[] { ANONYMOUS }); - qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) }); - assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]); - - msg = connection.receive(1000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - assertEquals(ANONYMOUS, new String(msg.getPayload())); - msg.ack(); - - connection.disconnect(); - } - - @Test(timeout = 60 * 1000) - public void testWildcardRetainedSubscription() throws Exception { - MQTT mqttPub = createMQTTConnection("pub", true); - mqttPub.setUserName("admin"); - mqttPub.setPassword("admin"); - - BlockingConnection connectionPub = mqttPub.blockingConnection(); - connectionPub.connect(); - connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true); - - MQTT mqttSub = createMQTTConnection("sub", true); - mqttSub.setUserName("user"); - mqttSub.setPassword("password"); - BlockingConnection connectionSub = mqttSub.blockingConnection(); - connectionSub.connect(); - connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)}); - Message msg = connectionSub.receive(1, TimeUnit.SECONDS); - assertNull("Shouldn't receive the message", msg); - } - - @Test(timeout = 60 * 1000) - public void testInvalidClientIdGetCorrectErrorCode() throws Exception { - MQTT mqttPub = createMQTTConnection("invalid", true); - - final AtomicInteger errorCode = new AtomicInteger(); - - mqttPub.setTracer(new Tracer() { - @Override - public void onReceive(MQTTFrame frame) { - LOG.info("Client received: {}", frame); - if (frame.messageType() == CONNACK.TYPE) { - CONNACK connAck = new CONNACK(); - try { - connAck.decode(frame); - LOG.info("{}", connAck); - errorCode.set(connAck.code().ordinal()); - assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED, connAck.code()); - } catch (ProtocolException e) { - fail("Error decoding publish " + e.getMessage()); - } - } - } - - @Override - public void onSend(MQTTFrame frame) { - LOG.info("Client sent: {}", frame); - } - }); - - BlockingConnection connectionPub = mqttPub.blockingConnection(); - try { - connectionPub.connect(); - fail("Should not be able to connect."); - } catch (Exception e) { - } - - assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED.ordinal(), errorCode.get()); - } - - @Test(timeout = 60 * 1000) - public void testBadCredentialExceptionGetsCorrectErrorCode() throws Exception { - MQTT mqttPub = createMQTTConnection("bad-credential", true); - mqttPub.setUserName("admin"); - mqttPub.setPassword("admin"); - - final AtomicInteger errorCode = new AtomicInteger(); - - mqttPub.setTracer(new Tracer() { - @Override - public void onReceive(MQTTFrame frame) { - LOG.info("Client received: {}", frame); - if (frame.messageType() == CONNACK.TYPE) { - CONNACK connAck = new CONNACK(); - try { - connAck.decode(frame); - LOG.info("{}", connAck); - errorCode.set(connAck.code().ordinal()); - assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code()); - } catch (ProtocolException e) { - fail("Error decoding publish " + e.getMessage()); - } - } - } - - @Override - public void onSend(MQTTFrame frame) { - LOG.info("Client sent: {}", frame); - } - }); - - BlockingConnection connectionPub = mqttPub.blockingConnection(); - try { - connectionPub.connect(); - fail("Should not be able to connect."); - } catch (Exception e) { - } - - assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD.ordinal(), errorCode.get()); - } - - @Override - protected void createPlugins(List<BrokerPlugin> plugins) throws Exception { - BrokerPlugin failOnSpecificConditionsPlugin = new BrokerPlugin() { - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new BrokerFilter(broker) { - @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - String clientId = info.getClientId(); - if (clientId != null && !clientId.isEmpty()) { - if (clientId.equalsIgnoreCase("invalid")) { - LOG.info("Client ID was invalid"); - throw new InvalidClientIDException("Bad client Id"); - } else if (clientId.equalsIgnoreCase("bad-credential")) { - LOG.info("User Name was invalid"); - throw new CredentialException("Unknwon User Name."); - } - } - } - }; - } - }; - - plugins.add(failOnSpecificConditionsPlugin); - } -}
