This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f68b0e8c8bf [To dev/1.3] Avoiding error log by removing ThreadLocal
for Mqtt Client
f68b0e8c8bf is described below
commit f68b0e8c8bffe31dad0543c1915b922790f906a8
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 17 21:39:47 2025 +0800
[To dev/1.3] Avoiding error log by removing ThreadLocal for Mqtt Client
(cherry picked from commit 8a8a7bedb5b00137dc6c3897ff73b2cc07cfd13e)
---
.../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java | 2 ++
.../apache/iotdb/db/protocol/session/SessionManager.java | 14 ++++++++++++++
2 files changed, 16 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 772f3672a14..d678fef937b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -88,6 +88,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
ZoneId.systemDefault().toString(),
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
ClientVersion.V_1_0);
+ sessionManager.registerSessionForMqtt(session);
clientIdToSessionMap.put(msg.getClientID(), session);
}
}
@@ -96,6 +97,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
public void onDisconnect(InterceptDisconnectMessage msg) {
MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
if (null != session) {
+ sessionManager.removeCurrSessionForMqtt(session);
sessionManager.closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index 1656a536995..e32034c9ab0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -316,6 +316,12 @@ public class SessionManager implements SessionManagerMBean
{
currSessionIdleTime.remove();
}
+ public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) {
+ if (mqttClientSession != null) {
+ sessions.remove(mqttClientSession);
+ }
+ }
+
/**
* this method can be only used in client-thread model. Do not use this
method in message-thread
* model based service.
@@ -333,6 +339,14 @@ public class SessionManager implements SessionManagerMBean
{
return true;
}
+ /**
+ * this method can be only used in mqtt model. Do not use this method in
client-thread model based
+ * service.
+ */
+ public void registerSessionForMqtt(IClientSession session) {
+ sessions.put(session, placeHolder);
+ }
+
/** must be called after registerSession()) will mark the session login. */
public void supplySession(
IClientSession session,