This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mqtt_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ff418d20a53c59f3453e42fe67f977db8627c02f
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 11 19:15:24 2025 +0800

    Remove ThreadLocal for Mqtt Client
---
 .../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java |  4 ++--
 .../apache/iotdb/db/protocol/session/SessionManager.java | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index ad97dd310c3..86dc21631e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -101,7 +101,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
           TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
           ClientVersion.V_1_0,
           useTableInsert ? IClientSession.SqlDialect.TABLE : 
IClientSession.SqlDialect.TREE);
-      sessionManager.registerSession(session);
+      sessionManager.registerSessionForMqtt(session);
       clientIdToSessionMap.put(msg.getClientID(), session);
     }
   }
@@ -110,7 +110,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
   public void onDisconnect(InterceptDisconnectMessage msg) {
     MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
     if (null != session) {
-      sessionManager.removeCurrSession();
+      sessionManager.removeCurrSessionForMqtt(session);
       sessionManager.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index 764d5e9fb25..4aa6c9db8a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -334,6 +334,12 @@ public class SessionManager implements SessionManagerMBean 
{
     currSessionIdleTime.remove();
   }
 
+  public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) {
+    if (mqttClientSession != null) {
+      sessions.remove(mqttClientSession);
+    }
+  }
+
   /**
    * this method can be only used in client-thread model. Do not use this 
method in message-thread
    * model based service.
@@ -351,6 +357,16 @@ public class SessionManager implements SessionManagerMBean 
{
     return true;
   }
 
+  /**
+   * this method can be only used in mqtt model. Do not use this method in 
client-thread model based
+   * service.
+   *
+   * @return false if the session has been initialized.
+   */
+  public boolean registerSessionForMqtt(IClientSession session) {
+    return sessions.put(session, placeHolder) == null;
+  }
+
   /** must be called after registerSession()) will mark the session login. */
   public void supplySession(
       IClientSession session,

Reply via email to