This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c1723158ebc [To Master] Adds extensibility to MQTT's table (#14976)
c1723158ebc is described below
commit c1723158ebc9540e0d0d8bc3b372e0eac78ba189
Author: ppppoooo <[email protected]>
AuthorDate: Fri Feb 28 10:34:44 2025 +0800
[To Master] Adds extensibility to MQTT's table (#14976)
* topic
* it
* config
* 新增注释
* error
* example
* example
* 去掉tree
---------
Co-authored-by: xz m <[email protected]>
---
.../server/CustomizedJsonPayloadFormatter.java | 8 ++-
.../org/apache/iotdb/mqtt/server/MyMessage.java | 80 ----------------------
.../java/org/apache/iotdb/mqtt/MQTTClient.java | 2 +-
.../relational/it/mqtt/IoTDBMQTTServiceIT.java | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../db/protocol/mqtt/JSONPayloadFormatter.java | 7 +-
.../db/protocol/mqtt/LinePayloadFormatter.java | 7 +-
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 11 ++-
.../iotdb/db/protocol/mqtt/PayloadFormatter.java | 6 ++
.../db/protocol/mqtt/PayloadFormatManagerTest.java | 2 +-
.../conf/iotdb-system.properties.template | 5 +-
11 files changed, 40 insertions(+), 92 deletions(-)
diff --git
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
index 171074d62a6..fec5b97b6fb 100644
---
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.mqtt.server;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
import io.netty.buffer.ByteBuf;
@@ -43,7 +44,7 @@ public class CustomizedJsonPayloadFormatter implements
PayloadFormatter {
// this is just an example, so we just generate some Messages directly
for (int i = 0; i < 2; i++) {
long ts = i;
- MyMessage message = new MyMessage();
+ TreeMessage message = new TreeMessage();
message.setDevice("d" + i);
message.setTimestamp(ts);
message.setMeasurements(Arrays.asList("s1", "s2"));
@@ -58,4 +59,9 @@ public class CustomizedJsonPayloadFormatter implements
PayloadFormatter {
// set the value of mqtt_payload_formatter in iotdb-common.properties as
the following string:
return "CustomizedJson";
}
+
+ @Override
+ public String getType() {
+ return PayloadFormatter.TREE_TYPE;
+ }
}
diff --git
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
deleted file mode 100644
index d0ab7e08b4e..00000000000
---
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.mqtt.server;
-
-import org.apache.iotdb.db.protocol.mqtt.Message;
-
-import org.apache.tsfile.enums.TSDataType;
-
-import java.util.List;
-
-public class MyMessage extends Message {
-
- private String device;
- private List<String> measurements;
- private List<TSDataType> dataTypes;
- private List<String> values;
-
- public String getDevice() {
- return device;
- }
-
- public void setDevice(String device) {
- this.device = device;
- }
-
- public List<String> getMeasurements() {
- return measurements;
- }
-
- public void setMeasurements(List<String> measurements) {
- this.measurements = measurements;
- }
-
- public List<TSDataType> getDataTypes() {
- return dataTypes;
- }
-
- public void setDataTypes(List<TSDataType> dataTypes) {
- this.dataTypes = dataTypes;
- }
-
- public List<String> getValues() {
- return values;
- }
-
- public void setValues(List<String> values) {
- this.values = values;
- }
-
- @Override
- public String toString() {
- return "Message{"
- + "device='"
- + device
- + '\''
- + ", timestamp="
- + super.timestamp
- + ", measurements="
- + measurements
- + ", values="
- + values
- + '}';
- }
-}
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
index fad59224e6f..84815a85a48 100644
--- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
+++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -79,7 +79,7 @@ public class MQTTClient {
Thread.sleep(10);
payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2";
- connection.publish(DATABASE + "/myTopic", payload.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);
payload =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
index fcf8af41327..118554d8aad 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
@@ -50,7 +50,7 @@ public class IoTDBMQTTServiceIT {
private static final String USER = System.getProperty("RemoteUser", "root");
private static final String PASSWORD = System.getProperty("RemotePassword",
"root");
private static final String DATABASE = "mqtttest";
- public static final String FORMATTER = "table-line";
+ public static final String FORMATTER = "line";
@Before
public void setUp() throws Exception {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fd42c57ca1c..72264ef26f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -113,7 +113,7 @@ public class IoTDBConfig {
private int mqttHandlerPoolSize = 1;
/** The mqtt message payload formatter. */
- private String mqttPayloadFormatter = "tree-json";
+ private String mqttPayloadFormatter = "json";
/** The mqtt save data path */
private String mqttDataPath = "data/";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
index 9d0af6af6c5..bbc9e62f389 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
@@ -118,6 +118,11 @@ public class JSONPayloadFormatter implements
PayloadFormatter {
@Override
public String getName() {
- return "tree-json";
+ return "json";
+ }
+
+ @Override
+ public String getType() {
+ return PayloadFormatter.TREE_TYPE;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
index 62481e0fcb0..8e389f78760 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
@@ -247,6 +247,11 @@ public class LinePayloadFormatter implements
PayloadFormatter {
@Override
public String getName() {
- return "table-line";
+ return "line";
+ }
+
+ @Override
+ public String getType() {
+ return PayloadFormatter.TABLE_TYPE;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index b7311f0c0a0..ac3a82d9730 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -81,7 +81,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
this.payloadFormat =
PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
- useTableInsert = (payloadFormat instanceof LinePayloadFormatter);
+ useTableInsert =
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
}
@Override
@@ -148,13 +148,18 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
if (useTableInsert) {
TableMessage tableMessage = (TableMessage) message;
// '/' previously defined as a database name
- tableMessage.setDatabase(
- msg.getTopicName().substring(0,
msg.getTopicName().indexOf("/")));
+ String database =
+ !msg.getTopicName().contains("/")
+ ? msg.getTopicName()
+ : msg.getTopicName().substring(0,
msg.getTopicName().indexOf("/"));
+ tableMessage.setDatabase(database);
insertTable(tableMessage, session);
} else {
insertTree((TreeMessage) message, session);
}
}
+ } catch (Throwable t) {
+ LOG.warn("onPublish execution exception, msg is [{}], error is ", msg,
t);
} finally {
// release the payload of the message
super.onPublish(msg);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
index 5f6527be997..278d6eb3743 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
@@ -30,6 +30,10 @@ import java.util.List;
* @see JSONPayloadFormatter
*/
public interface PayloadFormatter {
+
+ public static final String TREE_TYPE = "tree";
+ public static final String TABLE_TYPE = "table";
+
/**
* format a payload to a list of messages
*
@@ -44,4 +48,6 @@ public interface PayloadFormatter {
* @return
*/
String getName();
+
+ String getType();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
index 6731e2efad5..096f5d0d90d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
@@ -37,6 +37,6 @@ public class PayloadFormatManagerTest {
@Test
public void getDefaultPayloadFormat() {
- assertNotNull(PayloadFormatManager.getPayloadFormat("tree-json"));
+ assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
}
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 5ab621677cc..14af9eda6f0 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1948,9 +1948,10 @@ mqtt_handler_pool_size=1
# the mqtt message payload formatter.
# effectiveMode: restart
-# Options: [tree-json, table-line]
+# Options: [json, line]
+# The built-in json only supports tree models, and the line only supports
table models.
# Datatype: String
-mqtt_payload_formatter=tree-json
+mqtt_payload_formatter=json
# max length of mqtt message in byte
# effectiveMode: restart