This is an automated email from the ASF dual-hosted git repository.

critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c1723158ebc [To Master] Adds extensibility to MQTT's table (#14976)
c1723158ebc is described below

commit c1723158ebc9540e0d0d8bc3b372e0eac78ba189
Author: ppppoooo <[email protected]>
AuthorDate: Fri Feb 28 10:34:44 2025 +0800

    [To Master] Adds extensibility to MQTT's table (#14976)
    
    * topic
    
    * it
    
    * config
    
    * 新增注释
    
    * error
    
    * example
    
    * example
    
    * 去掉tree
    
    ---------
    
    Co-authored-by: xz m <[email protected]>
---
 .../server/CustomizedJsonPayloadFormatter.java     |  8 ++-
 .../org/apache/iotdb/mqtt/server/MyMessage.java    | 80 ----------------------
 .../java/org/apache/iotdb/mqtt/MQTTClient.java     |  2 +-
 .../relational/it/mqtt/IoTDBMQTTServiceIT.java     |  2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../db/protocol/mqtt/JSONPayloadFormatter.java     |  7 +-
 .../db/protocol/mqtt/LinePayloadFormatter.java     |  7 +-
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  | 11 ++-
 .../iotdb/db/protocol/mqtt/PayloadFormatter.java   |  6 ++
 .../db/protocol/mqtt/PayloadFormatManagerTest.java |  2 +-
 .../conf/iotdb-system.properties.template          |  5 +-
 11 files changed, 40 insertions(+), 92 deletions(-)

diff --git 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
index 171074d62a6..fec5b97b6fb 100644
--- 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++ 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.mqtt.server;
 
 import org.apache.iotdb.db.protocol.mqtt.Message;
 import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
 
 import io.netty.buffer.ByteBuf;
 
@@ -43,7 +44,7 @@ public class CustomizedJsonPayloadFormatter implements 
PayloadFormatter {
     // this is just an example, so we just generate some Messages directly
     for (int i = 0; i < 2; i++) {
       long ts = i;
-      MyMessage message = new MyMessage();
+      TreeMessage message = new TreeMessage();
       message.setDevice("d" + i);
       message.setTimestamp(ts);
       message.setMeasurements(Arrays.asList("s1", "s2"));
@@ -58,4 +59,9 @@ public class CustomizedJsonPayloadFormatter implements 
PayloadFormatter {
     // set the value of mqtt_payload_formatter in iotdb-common.properties as 
the following string:
     return "CustomizedJson";
   }
+
+  @Override
+  public String getType() {
+    return PayloadFormatter.TREE_TYPE;
+  }
 }
diff --git 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
deleted file mode 100644
index d0ab7e08b4e..00000000000
--- 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.mqtt.server;
-
-import org.apache.iotdb.db.protocol.mqtt.Message;
-
-import org.apache.tsfile.enums.TSDataType;
-
-import java.util.List;
-
-public class MyMessage extends Message {
-
-  private String device;
-  private List<String> measurements;
-  private List<TSDataType> dataTypes;
-  private List<String> values;
-
-  public String getDevice() {
-    return device;
-  }
-
-  public void setDevice(String device) {
-    this.device = device;
-  }
-
-  public List<String> getMeasurements() {
-    return measurements;
-  }
-
-  public void setMeasurements(List<String> measurements) {
-    this.measurements = measurements;
-  }
-
-  public List<TSDataType> getDataTypes() {
-    return dataTypes;
-  }
-
-  public void setDataTypes(List<TSDataType> dataTypes) {
-    this.dataTypes = dataTypes;
-  }
-
-  public List<String> getValues() {
-    return values;
-  }
-
-  public void setValues(List<String> values) {
-    this.values = values;
-  }
-
-  @Override
-  public String toString() {
-    return "Message{"
-        + "device='"
-        + device
-        + '\''
-        + ", timestamp="
-        + super.timestamp
-        + ", measurements="
-        + measurements
-        + ", values="
-        + values
-        + '}';
-  }
-}
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java 
b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
index fad59224e6f..84815a85a48 100644
--- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
+++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -79,7 +79,7 @@ public class MQTTClient {
     Thread.sleep(10);
 
     payload = "test1,tag1=t1,tag2=t2  field4=2,field5=2i32,field6=2f 2";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
     Thread.sleep(10);
 
     payload =
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
index fcf8af41327..118554d8aad 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
@@ -50,7 +50,7 @@ public class IoTDBMQTTServiceIT {
   private static final String USER = System.getProperty("RemoteUser", "root");
   private static final String PASSWORD = System.getProperty("RemotePassword", 
"root");
   private static final String DATABASE = "mqtttest";
-  public static final String FORMATTER = "table-line";
+  public static final String FORMATTER = "line";
 
   @Before
   public void setUp() throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fd42c57ca1c..72264ef26f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -113,7 +113,7 @@ public class IoTDBConfig {
   private int mqttHandlerPoolSize = 1;
 
   /** The mqtt message payload formatter. */
-  private String mqttPayloadFormatter = "tree-json";
+  private String mqttPayloadFormatter = "json";
 
   /** The mqtt save data path */
   private String mqttDataPath = "data/";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
index 9d0af6af6c5..bbc9e62f389 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
@@ -118,6 +118,11 @@ public class JSONPayloadFormatter implements 
PayloadFormatter {
 
   @Override
   public String getName() {
-    return "tree-json";
+    return "json";
+  }
+
+  @Override
+  public String getType() {
+    return PayloadFormatter.TREE_TYPE;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
index 62481e0fcb0..8e389f78760 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
@@ -247,6 +247,11 @@ public class LinePayloadFormatter implements 
PayloadFormatter {
 
   @Override
   public String getName() {
-    return "table-line";
+    return "line";
+  }
+
+  @Override
+  public String getType() {
+    return PayloadFormatter.TABLE_TYPE;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index b7311f0c0a0..ac3a82d9730 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -81,7 +81,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
     this.payloadFormat = 
PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
     partitionFetcher = ClusterPartitionFetcher.getInstance();
     schemaFetcher = ClusterSchemaFetcher.getInstance();
-    useTableInsert = (payloadFormat instanceof LinePayloadFormatter);
+    useTableInsert = 
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
   }
 
   @Override
@@ -148,13 +148,18 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
         if (useTableInsert) {
           TableMessage tableMessage = (TableMessage) message;
           // '/' previously defined as a database name
-          tableMessage.setDatabase(
-              msg.getTopicName().substring(0, 
msg.getTopicName().indexOf("/")));
+          String database =
+              !msg.getTopicName().contains("/")
+                  ? msg.getTopicName()
+                  : msg.getTopicName().substring(0, 
msg.getTopicName().indexOf("/"));
+          tableMessage.setDatabase(database);
           insertTable(tableMessage, session);
         } else {
           insertTree((TreeMessage) message, session);
         }
       }
+    } catch (Throwable t) {
+      LOG.warn("onPublish execution exception, msg is [{}], error is ", msg, 
t);
     } finally {
       // release the payload of the message
       super.onPublish(msg);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
index 5f6527be997..278d6eb3743 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
@@ -30,6 +30,10 @@ import java.util.List;
  * @see JSONPayloadFormatter
  */
 public interface PayloadFormatter {
+
+  public static final String TREE_TYPE = "tree";
+  public static final String TABLE_TYPE = "table";
+
   /**
    * format a payload to a list of messages
    *
@@ -44,4 +48,6 @@ public interface PayloadFormatter {
    * @return
    */
   String getName();
+
+  String getType();
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
index 6731e2efad5..096f5d0d90d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
@@ -37,6 +37,6 @@ public class PayloadFormatManagerTest {
 
   @Test
   public void getDefaultPayloadFormat() {
-    assertNotNull(PayloadFormatManager.getPayloadFormat("tree-json"));
+    assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
   }
 }
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 5ab621677cc..14af9eda6f0 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1948,9 +1948,10 @@ mqtt_handler_pool_size=1
 
 # the mqtt message payload formatter.
 # effectiveMode: restart
-# Options: [tree-json, table-line]
+# Options: [json, line]
+# The built-in json only supports tree models, and the line only supports 
table models.
 # Datatype: String
-mqtt_payload_formatter=tree-json
+mqtt_payload_formatter=json
 
 # max length of mqtt message in byte
 # effectiveMode: restart

Reply via email to