This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mqtt-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 31c971331197913108a0dcd22168e4f325c44123 Author: Jackie Tien <[email protected]> AuthorDate: Tue Jun 17 18:36:08 2025 +0800 Avoiding error log by removing ThreadLocal for Mqtt Client (cherry picked from commit 8a8a7bedb5b00137dc6c3897ff73b2cc07cfd13e) --- .../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java | 2 ++ .../apache/iotdb/db/protocol/session/SessionManager.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java index 772f3672a14..d678fef937b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java @@ -88,6 +88,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler { ZoneId.systemDefault().toString(), TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, ClientVersion.V_1_0); + sessionManager.registerSessionForMqtt(session); clientIdToSessionMap.put(msg.getClientID(), session); } } @@ -96,6 +97,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler { public void onDisconnect(InterceptDisconnectMessage msg) { MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID()); if (null != session) { + sessionManager.removeCurrSessionForMqtt(session); sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index 1656a536995..e32034c9ab0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -316,6 +316,12 @@ public class SessionManager implements SessionManagerMBean { currSessionIdleTime.remove(); } + public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) { + if (mqttClientSession != null) { + sessions.remove(mqttClientSession); + } + } + /** * this method can be only used in client-thread model. Do not use this method in message-thread * model based service. @@ -333,6 +339,14 @@ public class SessionManager implements SessionManagerMBean { return true; } + /** + * this method can be only used in mqtt model. Do not use this method in client-thread model based + * service. + */ + public void registerSessionForMqtt(IClientSession session) { + sessions.put(session, placeHolder); + } + /** must be called after registerSession()) will mark the session login. */ public void supplySession( IClientSession session,
