This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new c361704  ARTEMIS-2686 Fix MQTT connect message rejection
     new 598c263  This closes #3058
c361704 is described below

commit c36170477d4ead2e3c4704fb71e0f4a9796b6f8b
Author: brusdev <[email protected]>
AuthorDate: Thu Apr 2 11:17:37 2020 +0200

    ARTEMIS-2686 Fix MQTT connect message rejection
    
    Initialize the session state with a default value to fix a NPE, when an 
incoming
    MQTT interceptor rejects a MqttConnectMessage.
---
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  2 ++
 .../core/protocol/mqtt/MQTTSessionState.java       |  2 ++
 .../imported/MQTTRejectingInterceptorTest.java     | 39 ++++++++++++++++++++++
 3 files changed, 43 insertions(+)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index b74622e..b06d1f5 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -77,6 +77,8 @@ public class MQTTSession {
       subscriptionManager = new MQTTSubscriptionManager(this);
       retainMessageManager = new MQTTRetainMessageManager(this);
 
+      state = MQTTSessionState.DEFAULT;
+
       log.debug("SESSION CREATED: " + id);
    }
 
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 6c94c15..204f8d5 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -32,6 +32,8 @@ import 
org.apache.activemq.artemis.core.config.WildcardConfiguration;
 
 public class MQTTSessionState {
 
+   public static final MQTTSessionState DEFAULT = new MQTTSessionState(null);
+
    private String clientId;
 
    private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = 
new ConcurrentHashMap<>();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
index cda06c0..1ae2d98 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt.imported;
 
+import java.util.concurrent.CountDownLatch;
+
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -61,4 +64,40 @@ public class MQTTRejectingInterceptorTest extends 
MQTTTestSupport {
       subscribeProvider.disconnect();
       publishProvider.disconnect();
    }
+
+   @Test(timeout = 60000)
+   public void testRejectedMqttConnectMessage() throws Exception {
+      CountDownLatch publishThreadReady = new CountDownLatch(1);
+
+      server.getRemotingService().addIncomingInterceptor((MQTTInterceptor) 
(packet, connection) -> {
+         if (packet.getClass() == MqttConnectMessage.class) {
+            return false;
+         } else {
+            return true;
+         }
+      });
+
+      Thread publishThread = new Thread(() -> {
+         MQTTClientProvider publishProvider = getMQTTClientProvider();
+
+         publishThreadReady.countDown();
+
+         try {
+            initializeConnection(publishProvider);
+            publishProvider.disconnect();
+            fail("The connection should be rejected!");
+         } catch (Exception ignore) {
+         }
+      });
+
+      publishThread.start();
+
+      publishThreadReady.await();
+
+      publishThread.join(3000);
+
+      if (publishThread.isAlive()) {
+         fail("The connection is stuck!");
+      }
+   }
 }

Reply via email to