This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new c361704 ARTEMIS-2686 Fix MQTT connect message rejection
new 598c263 This closes #3058
c361704 is described below
commit c36170477d4ead2e3c4704fb71e0f4a9796b6f8b
Author: brusdev <[email protected]>
AuthorDate: Thu Apr 2 11:17:37 2020 +0200
ARTEMIS-2686 Fix MQTT connect message rejection
Initialize the session state with a default value to fix a NPE, when an
incoming
MQTT interceptor rejects a MqttConnectMessage.
---
.../artemis/core/protocol/mqtt/MQTTSession.java | 2 ++
.../core/protocol/mqtt/MQTTSessionState.java | 2 ++
.../imported/MQTTRejectingInterceptorTest.java | 39 ++++++++++++++++++++++
3 files changed, 43 insertions(+)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index b74622e..b06d1f5 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -77,6 +77,8 @@ public class MQTTSession {
subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this);
+ state = MQTTSessionState.DEFAULT;
+
log.debug("SESSION CREATED: " + id);
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 6c94c15..204f8d5 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -32,6 +32,8 @@ import
org.apache.activemq.artemis.core.config.WildcardConfiguration;
public class MQTTSessionState {
+ public static final MQTTSessionState DEFAULT = new MQTTSessionState(null);
+
private String clientId;
private final ConcurrentMap<String, MqttTopicSubscription> subscriptions =
new ConcurrentHashMap<>();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
index cda06c0..1ae2d98 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+import java.util.concurrent.CountDownLatch;
+
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -61,4 +64,40 @@ public class MQTTRejectingInterceptorTest extends
MQTTTestSupport {
subscribeProvider.disconnect();
publishProvider.disconnect();
}
+
+ @Test(timeout = 60000)
+ public void testRejectedMqttConnectMessage() throws Exception {
+ CountDownLatch publishThreadReady = new CountDownLatch(1);
+
+ server.getRemotingService().addIncomingInterceptor((MQTTInterceptor)
(packet, connection) -> {
+ if (packet.getClass() == MqttConnectMessage.class) {
+ return false;
+ } else {
+ return true;
+ }
+ });
+
+ Thread publishThread = new Thread(() -> {
+ MQTTClientProvider publishProvider = getMQTTClientProvider();
+
+ publishThreadReady.countDown();
+
+ try {
+ initializeConnection(publishProvider);
+ publishProvider.disconnect();
+ fail("The connection should be rejected!");
+ } catch (Exception ignore) {
+ }
+ });
+
+ publishThread.start();
+
+ publishThreadReady.await();
+
+ publishThread.join(3000);
+
+ if (publishThread.isAlive()) {
+ fail("The connection is stuck!");
+ }
+ }
}