This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 0607406574 [To rel/0.13][IOTDB-4343] Fix session manager in MQTT 
module.  (#7249)
0607406574 is described below

commit 060740657447d03736fb9ed22ec1b110534a4a85
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Sep 7 17:56:42 2022 +0800

    [To rel/0.13][IOTDB-4343] Fix session manager in MQTT module.  (#7249)
---
 .../iotdb/db/protocol/mqtt/PublishHandler.java     | 36 ++++++++++++++--------
 .../iotdb/db/protocol/mqtt/PublishHandlerTest.java |  7 +++--
 2 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java 
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index a4e707a6d5..892e17cde5 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -40,12 +40,13 @@ import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** PublishHandler handle the messages from MQTT clients. */
 public class PublishHandler extends AbstractInterceptHandler {
 
   private final ServiceProvider serviceProvider = IoTDB.serviceProvider;
-  private long sessionId;
+  private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new 
ConcurrentHashMap<>();
   private static final boolean isEnableOperationSync =
       IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
   private static final Logger LOG = 
LoggerFactory.getLogger(PublishHandler.class);
@@ -62,32 +63,41 @@ public class PublishHandler extends 
AbstractInterceptHandler {
 
   @Override
   public String getID() {
-    return "iotdb-mqtt-broker-listener-" + sessionId;
+    return "iotdb-mqtt-broker-listener";
   }
 
   @Override
   public void onConnect(InterceptConnectMessage msg) {
-    try {
-      BasicOpenSessionResp basicOpenSessionResp =
-          serviceProvider.openSession(
-              msg.getUsername(),
-              new String(msg.getPassword()),
-              ZoneId.systemDefault().toString(),
-              TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
-      sessionId = basicOpenSessionResp.getSessionId();
-    } catch (TException e) {
-      throw new RuntimeException(e);
+    if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+      try {
+        BasicOpenSessionResp basicOpenSessionResp =
+            serviceProvider.openSession(
+                msg.getUsername(),
+                new String(msg.getPassword()),
+                ZoneId.systemDefault().toString(),
+                TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+        clientIdToSessionIdMap.put(msg.getClientID(), 
basicOpenSessionResp.getSessionId());
+      } catch (TException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
   @Override
   public void onDisconnect(InterceptDisconnectMessage msg) {
-    serviceProvider.closeSession(sessionId);
+    Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
+    if (null != sessionId) {
+      serviceProvider.closeSession(sessionId);
+    }
   }
 
   @Override
   public void onPublish(InterceptPublishMessage msg) {
     String clientId = msg.getClientID();
+    if (!clientIdToSessionIdMap.containsKey(clientId)) {
+      return;
+    }
+    long sessionId = clientIdToSessionIdMap.get(clientId);
     ByteBuf payload = msg.getPayload();
     String topic = msg.getTopicName();
     String username = msg.getUsername();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
index 5715a3b61f..ba307a872c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
@@ -53,6 +53,7 @@ public class PublishHandlerTest {
   public void onPublish() throws ClassNotFoundException {
     PayloadFormatter payloadFormat = 
PayloadFormatManager.getPayloadFormat("json");
     PublishHandler handler = new PublishHandler(payloadFormat);
+    String clientId = "clientId";
 
     String payload =
         "{\n"
@@ -66,7 +67,7 @@ public class PublishHandlerTest {
 
     // connect
     MqttConnectPayload mqttConnectPayload =
-        new MqttConnectPayload(null, null, "test", "root", "root");
+        new MqttConnectPayload(clientId, null, "test", "root", "root");
     MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(null, null, 
mqttConnectPayload);
     InterceptConnectMessage interceptConnectMessage =
         new InterceptConnectMessage(mqttConnectMessage);
@@ -77,12 +78,12 @@ public class PublishHandlerTest {
     MqttFixedHeader fixedHeader =
         new MqttFixedHeader(MqttMessageType.PUBLISH, false, 
MqttQoS.AT_LEAST_ONCE, false, 1);
     MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, 
variableHeader, buf);
-    InterceptPublishMessage message = new 
InterceptPublishMessage(publishMessage, null, null);
+    InterceptPublishMessage message = new 
InterceptPublishMessage(publishMessage, clientId, null);
     handler.onPublish(message);
 
     // disconnect
     InterceptDisconnectMessage interceptDisconnectMessage =
-        new InterceptDisconnectMessage(null, null);
+        new InterceptDisconnectMessage(clientId, null);
     handler.onDisconnect(interceptDisconnectMessage);
 
     String[] retArray = new String[] {"1586076045524,0.530635,"};

Reply via email to