Repository: activemq Updated Branches: refs/heads/master 310090904 -> 789eb9abf
https://issues.apache.org/jira/browse/AMQ-5834 Ensure that a publish receives an ACK even when the user is not authorized to write to the target destination Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/789eb9ab Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/789eb9ab Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/789eb9ab Branch: refs/heads/master Commit: 789eb9abf9f6c01e58a6e65dc72006e778272660 Parents: 3100909 Author: Timothy Bish <[email protected]> Authored: Wed Jun 10 14:59:02 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Jun 10 14:59:02 2015 -0400 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 67 +++++++++----------- .../activemq/transport/mqtt/MQTTAuthTest.java | 39 ++++++++++++ 2 files changed, 70 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/789eb9ab/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 39e9b84..5bd1a32 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -700,43 +700,38 @@ public class MQTTProtocolConverter { ResponseHandler createResponseHandler(final PUBLISH command) { if (command != null) { - switch (command.qos()) { - case AT_LEAST_ONCE: - return new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - if (response.isException()) { - LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); - } else { - PUBACK ack = new PUBACK(); - ack.messageId(command.messageId()); - LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", - command.messageId(), clientId, connectionInfo.getConnectionId()); - converter.getMQTTTransport().sendToMQTT(ack.encode()); - } - } - }; - case EXACTLY_ONCE: - return new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - if (response.isException()) { - LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); - } else { - PUBREC ack = new PUBREC(); - ack.messageId(command.messageId()); - synchronized (publisherRecs) { - publisherRecs.put(command.messageId(), ack); - } - LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", - command.messageId(), clientId, connectionInfo.getConnectionId()); - converter.getMQTTTransport().sendToMQTT(ack.encode()); + return new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + Throwable error = ((ExceptionResponse) response).getException(); + LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage()); + LOG.trace("Error trace: {}", error); + } + + switch (command.qos()) { + case AT_LEAST_ONCE: + PUBACK ack = new PUBACK(); + ack.messageId(command.messageId()); + LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); + converter.getMQTTTransport().sendToMQTT(ack.encode()); + break; + case EXACTLY_ONCE: + PUBREC req = new PUBREC(); + req.messageId(command.messageId()); + synchronized (publisherRecs) { + publisherRecs.put(command.messageId(), req); } - } - }; - case AT_MOST_ONCE: - break; - } + LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); + converter.getMQTTTransport().sendToMQTT(req.encode()); + break; + default: + break; + } + } + }; } return null; } http://git-wip-us.apache.org/repos/asf/activemq/blob/789eb9ab/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java index 77942a0..7ffb3e8 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java @@ -196,6 +196,45 @@ public class MQTTAuthTest extends MQTTAuthTestSupport { assertNull(msg); } + @Test(timeout = 30 * 1000) + public void testPublishWhenNotAuthorizedDoesNotStall() throws Exception { + + getProxyToBroker().addTopic("USERS.foo"); + + MQTT mqtt = null; + BlockingConnection connection = null; + + // Test 3.1 functionality + mqtt = createMQTTConnection("pub", true); + mqtt.setUserName("guest"); + mqtt.setPassword("password"); + mqtt.setVersion("3.1"); + + connection = mqtt.blockingConnection(); + connection.connect(); + connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true); + connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true); + connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true); + connection.disconnect(); + + assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount()); + + // Test 3.1.1 functionality + mqtt = createMQTTConnection("pub", true); + mqtt.setUserName("guest"); + mqtt.setPassword("password"); + mqtt.setVersion("3.1.1"); + + connection = mqtt.blockingConnection(); + connection.connect(); + connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true); + connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true); + connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true); + connection.disconnect(); + + assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount()); + } + @Test(timeout = 60 * 1000) public void testWildcardRetainedSubscription() throws Exception { MQTT mqttPub = createMQTTConnection("pub", true);
