This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mqtt_bug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ff418d20a53c59f3453e42fe67f977db8627c02f Author: JackieTien97 <[email protected]> AuthorDate: Wed Jun 11 19:15:24 2025 +0800 Remove ThreadLocal for Mqtt Client --- .../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java | 4 ++-- .../apache/iotdb/db/protocol/session/SessionManager.java | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java index ad97dd310c3..86dc21631e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java @@ -101,7 +101,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler { TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, ClientVersion.V_1_0, useTableInsert ? IClientSession.SqlDialect.TABLE : IClientSession.SqlDialect.TREE); - sessionManager.registerSession(session); + sessionManager.registerSessionForMqtt(session); clientIdToSessionMap.put(msg.getClientID(), session); } } @@ -110,7 +110,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler { public void onDisconnect(InterceptDisconnectMessage msg) { MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID()); if (null != session) { - sessionManager.removeCurrSession(); + sessionManager.removeCurrSessionForMqtt(session); sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index 764d5e9fb25..4aa6c9db8a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -334,6 +334,12 @@ public class SessionManager implements SessionManagerMBean { currSessionIdleTime.remove(); } + public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) { + if (mqttClientSession != null) { + sessions.remove(mqttClientSession); + } + } + /** * this method can be only used in client-thread model. Do not use this method in message-thread * model based service. @@ -351,6 +357,16 @@ public class SessionManager implements SessionManagerMBean { return true; } + /** + * this method can be only used in mqtt model. Do not use this method in client-thread model based + * service. + * + * @return false if the session has been initialized. + */ + public boolean registerSessionForMqtt(IClientSession session) { + return sessions.put(session, placeHolder) == null; + } + /** must be called after registerSession()) will mark the session login. */ public void supplySession( IClientSession session,
