This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 0607406574 [To rel/0.13][IOTDB-4343] Fix session manager in MQTT
module. (#7249)
0607406574 is described below
commit 060740657447d03736fb9ed22ec1b110534a4a85
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Sep 7 17:56:42 2022 +0800
[To rel/0.13][IOTDB-4343] Fix session manager in MQTT module. (#7249)
---
.../iotdb/db/protocol/mqtt/PublishHandler.java | 36 ++++++++++++++--------
.../iotdb/db/protocol/mqtt/PublishHandlerTest.java | 7 +++--
2 files changed, 27 insertions(+), 16 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index a4e707a6d5..892e17cde5 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -40,12 +40,13 @@ import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
/** PublishHandler handle the messages from MQTT clients. */
public class PublishHandler extends AbstractInterceptHandler {
private final ServiceProvider serviceProvider = IoTDB.serviceProvider;
- private long sessionId;
+ private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new
ConcurrentHashMap<>();
private static final boolean isEnableOperationSync =
IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
private static final Logger LOG =
LoggerFactory.getLogger(PublishHandler.class);
@@ -62,32 +63,41 @@ public class PublishHandler extends
AbstractInterceptHandler {
@Override
public String getID() {
- return "iotdb-mqtt-broker-listener-" + sessionId;
+ return "iotdb-mqtt-broker-listener";
}
@Override
public void onConnect(InterceptConnectMessage msg) {
- try {
- BasicOpenSessionResp basicOpenSessionResp =
- serviceProvider.openSession(
- msg.getUsername(),
- new String(msg.getPassword()),
- ZoneId.systemDefault().toString(),
- TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
- sessionId = basicOpenSessionResp.getSessionId();
- } catch (TException e) {
- throw new RuntimeException(e);
+ if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) {
+ try {
+ BasicOpenSessionResp basicOpenSessionResp =
+ serviceProvider.openSession(
+ msg.getUsername(),
+ new String(msg.getPassword()),
+ ZoneId.systemDefault().toString(),
+ TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ clientIdToSessionIdMap.put(msg.getClientID(),
basicOpenSessionResp.getSessionId());
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
}
}
@Override
public void onDisconnect(InterceptDisconnectMessage msg) {
- serviceProvider.closeSession(sessionId);
+ Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID());
+ if (null != sessionId) {
+ serviceProvider.closeSession(sessionId);
+ }
}
@Override
public void onPublish(InterceptPublishMessage msg) {
String clientId = msg.getClientID();
+ if (!clientIdToSessionIdMap.containsKey(clientId)) {
+ return;
+ }
+ long sessionId = clientIdToSessionIdMap.get(clientId);
ByteBuf payload = msg.getPayload();
String topic = msg.getTopicName();
String username = msg.getUsername();
diff --git
a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
index 5715a3b61f..ba307a872c 100644
---
a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
@@ -53,6 +53,7 @@ public class PublishHandlerTest {
public void onPublish() throws ClassNotFoundException {
PayloadFormatter payloadFormat =
PayloadFormatManager.getPayloadFormat("json");
PublishHandler handler = new PublishHandler(payloadFormat);
+ String clientId = "clientId";
String payload =
"{\n"
@@ -66,7 +67,7 @@ public class PublishHandlerTest {
// connect
MqttConnectPayload mqttConnectPayload =
- new MqttConnectPayload(null, null, "test", "root", "root");
+ new MqttConnectPayload(clientId, null, "test", "root", "root");
MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(null, null,
mqttConnectPayload);
InterceptConnectMessage interceptConnectMessage =
new InterceptConnectMessage(mqttConnectMessage);
@@ -77,12 +78,12 @@ public class PublishHandlerTest {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false,
MqttQoS.AT_LEAST_ONCE, false, 1);
MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader,
variableHeader, buf);
- InterceptPublishMessage message = new
InterceptPublishMessage(publishMessage, null, null);
+ InterceptPublishMessage message = new
InterceptPublishMessage(publishMessage, clientId, null);
handler.onPublish(message);
// disconnect
InterceptDisconnectMessage interceptDisconnectMessage =
- new InterceptDisconnectMessage(null, null);
+ new InterceptDisconnectMessage(clientId, null);
handler.onDisconnect(interceptDisconnectMessage);
String[] retArray = new String[] {"1586076045524,0.530635,"};