This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mqtt-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 31c971331197913108a0dcd22168e4f325c44123
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 17 18:36:08 2025 +0800

    Avoiding error log by removing ThreadLocal for Mqtt Client
    
    (cherry picked from commit 8a8a7bedb5b00137dc6c3897ff73b2cc07cfd13e)
---
 .../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java   |  2 ++
 .../apache/iotdb/db/protocol/session/SessionManager.java   | 14 ++++++++++++++
 2 files changed, 16 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 772f3672a14..d678fef937b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -88,6 +88,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
           ZoneId.systemDefault().toString(),
           TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
           ClientVersion.V_1_0);
+      sessionManager.registerSessionForMqtt(session);
       clientIdToSessionMap.put(msg.getClientID(), session);
     }
   }
@@ -96,6 +97,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
   public void onDisconnect(InterceptDisconnectMessage msg) {
     MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
     if (null != session) {
+      sessionManager.removeCurrSessionForMqtt(session);
       sessionManager.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index 1656a536995..e32034c9ab0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -316,6 +316,12 @@ public class SessionManager implements SessionManagerMBean 
{
     currSessionIdleTime.remove();
   }
 
+  public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) {
+    if (mqttClientSession != null) {
+      sessions.remove(mqttClientSession);
+    }
+  }
+
   /**
    * this method can be only used in client-thread model. Do not use this 
method in message-thread
    * model based service.
@@ -333,6 +339,14 @@ public class SessionManager implements SessionManagerMBean 
{
     return true;
   }
 
+  /**
+   * this method can be only used in mqtt model. Do not use this method in 
client-thread model based
+   * service.
+   */
+  public void registerSessionForMqtt(IClientSession session) {
+    sessions.put(session, placeHolder);
+  }
+
   /** must be called after registerSession()) will mark the session login. */
   public void supplySession(
       IClientSession session,

Reply via email to