This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch add_mqtt_server
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git

commit f9e66a13c165c141abd53cb864c44c2d7848acc8
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Oct 30 14:59:32 2025 +0800

    tmp save
---
 examples/mqtt-customize/README.md                  |  42 +++
 examples/mqtt-customize/pom.xml                    |  39 +++
 .../server/CustomizedJsonPayloadFormatter.java     |  74 +++++
 ....apache.iotdb.db.protocol.mqtt.PayloadFormatter |  20 ++
 examples/mqtt/README.md                            |  33 +++
 examples/mqtt/pom.xml                              |  37 +++
 .../java/org/apache/iotdb/mqtt/MQTTClient.java     | 112 +++++++
 examples/pom.xml                                   |   6 +
 mqtt-service/pom.xml                               |  36 +++
 .../java/org/apache/iotdb/protocol/mqtt/App.java   |  13 +
 .../iotdb/protocol/mqtt/BrokerAuthenticator.java   |  54 ++++
 .../iotdb/protocol/mqtt/MPPPublishHandler.java     | 326 +++++++++++++++++++++
 .../apache/iotdb/protocol/mqtt/MQTTService.java    | 124 ++++++++
 .../org/apache/iotdb/protocol/mqtt/MqttConfig.java | 237 +++++++++++++++
 .../apache/iotdb/protocol/mqtt/MqttConstant.java   |  33 +++
 .../protocol/mqtt/msg/JSONPayloadFormatter.java    | 150 ++++++++++
 .../protocol/mqtt/msg/LinePayloadFormatter.java    | 282 ++++++++++++++++++
 .../apache/iotdb/protocol/mqtt/msg/Message.java    |  33 +++
 .../protocol/mqtt/msg/PayloadFormatManager.java    | 134 +++++++++
 .../iotdb/protocol/mqtt/msg/PayloadFormatter.java  |  66 +++++
 .../iotdb/protocol/mqtt/msg/TableMessage.java      | 144 +++++++++
 .../iotdb/protocol/mqtt/msg/TreeMessage.java       |  77 +++++
 mqtt-service/src/main/resources/mqtt.properties    |  35 +++
 .../src/test/java/org/apache/iotdb/AppTest.java    |  28 ++
 .../org/apache/iotdb/BrokerAuthenticatorTest.java  |  49 ++++
 .../org/apache/iotdb/JSONPayloadFormatterTest.java | 134 +++++++++
 .../org/apache/iotdb/LinePayloadFormatterTest.java |  96 ++++++
 .../org/apache/iotdb/PayloadFormatManagerTest.java |  42 +++
 pom.xml                                            |   8 +-
 29 files changed, 2463 insertions(+), 1 deletion(-)

diff --git a/examples/mqtt-customize/README.md 
b/examples/mqtt-customize/README.md
new file mode 100644
index 0000000..9e53c96
--- /dev/null
+++ b/examples/mqtt-customize/README.md
@@ -0,0 +1,42 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# Customized IoTDB-MQTT-Broker Example
+
+## Function
+```
+The example is to show how to customize your MQTT message format
+```
+
+## Usage
+
+* Define your implementation which implements `PayloadFormatter.java`
+* modify the file in 
`src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`:
+  clean the file and put your implementation class name into the file
+* compile your implementation as a jar file
+
+  
+Then, in your mqtt-service: 
+* Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. 
+** Start an IoTDB MqttService (in the mqtt-service module).
+* Set the value of program parameter `-mqtt_payload_formatter` as the value of 
getName() in your implementation 
+* Launch the IoTDB server.
+* Now IoTDB will use your implementation to parse the MQTT message.
+
diff --git a/examples/mqtt-customize/pom.xml b/examples/mqtt-customize/pom.xml
new file mode 100644
index 0000000..9a530f4
--- /dev/null
+++ b/examples/mqtt-customize/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-examples</artifactId>
+        <version>2.0.6-SNAPSHOT</version>
+    </parent>
+    <artifactId>customize-mqtt-example</artifactId>
+    <name>IoTDB: Example: Customized MQTT</name>
+    <dependencies>
+        <!-- used by the server-->
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/examples/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
 
b/examples/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
new file mode 100644
index 0000000..8c3a962
--- /dev/null
+++ 
b/examples/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mqtt.server;
+
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tsfile.external.commons.lang3.NotImplementedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+
+  @Override
+  public List<Message> format(String topic, ByteBuf payload) {
+    // Suppose the payload is a json format
+    if (payload == null) {
+      return Collections.emptyList();
+    }
+
+    // parse data from the json and generate Messages and put them into 
List<Message> ret
+    List<Message> ret = new ArrayList<>();
+    // this is just an example, so we just generate some Messages directly
+    for (int i = 0; i < 2; i++) {
+      long ts = i;
+      TreeMessage message = new TreeMessage();
+      message.setDevice("d" + i);
+      message.setTimestamp(ts);
+      message.setMeasurements(Arrays.asList("s1", "s2"));
+      message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
+      ret.add(message);
+    }
+    return ret;
+  }
+
+  @Override
+  @Deprecated
+  public List<Message> format(ByteBuf payload) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public String getName() {
+    // set the value of mqtt_payload_formatter in iotdb-common.properties as 
the following string:
+    return "CustomizedJson";
+  }
+
+  @Override
+  public String getType() {
+    return PayloadFormatter.TREE_TYPE;
+  }
+}
diff --git 
a/examples/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
 
b/examples/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
new file mode 100644
index 0000000..b0b824b
--- /dev/null
+++ 
b/examples/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
\ No newline at end of file
diff --git a/examples/mqtt/README.md b/examples/mqtt/README.md
new file mode 100644
index 0000000..b32adf8
--- /dev/null
+++ b/examples/mqtt/README.md
@@ -0,0 +1,33 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# IoTDB-MQTT-Broker Example
+
+## Function
+```
+The example is to show how to send data to IoTDB from a mqtt client.
+```
+
+## Usage
+
+* Start an IoTDB MqttService (in the mqtt-service module).
+* Launch the IoTDB server.
+* Setup database `CREATE DATABASE root.sg` and create time timeseries `CREATE 
TIMESERIES root.sg.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN`.
+* Run `org.apache.iotdb.mqtt.MQTTClient` to run the mqtt client and send 
events to server.
diff --git a/examples/mqtt/pom.xml b/examples/mqtt/pom.xml
new file mode 100644
index 0000000..4e8aa19
--- /dev/null
+++ b/examples/mqtt/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>examples</artifactId>
+        <version>2.0.6-SNAPSHOT</version>
+    </parent>
+    <artifactId>mqtt-example</artifactId>
+    <name>IoTDB Extras: Example: MQTT</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.fusesource.mqtt-client</groupId>
+            <artifactId>mqtt-client</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/examples/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java 
b/examples/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
new file mode 100644
index 0000000..ec15ad2
--- /dev/null
+++ b/examples/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import java.util.Random;
+
+public class MQTTClient {
+
+  private static final String DATABASE = "myMqttTest";
+
+  public static void main(String[] args) throws Exception {
+    MQTT mqtt = new MQTT();
+    mqtt.setHost("127.0.0.1", 1883);
+    mqtt.setUserName("root");
+    mqtt.setPassword("root");
+    mqtt.setConnectAttemptsMax(3);
+    mqtt.setReconnectDelay(10);
+
+    BlockingConnection connection = mqtt.blockingConnection();
+    connection.connect();
+    // the config mqttPayloadFormatter must be tree-json
+    // jsonPayloadFormatter(connection);
+    // the config mqttPayloadFormatter must be table-line
+    linePayloadFormatter(connection);
+    connection.disconnect();
+  }
+
+  private static void jsonPayloadFormatter(BlockingConnection connection) 
throws Exception {
+    Random random = new Random();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 10; i++) {
+      String payload =
+          String.format(
+              "{\n"
+                  + "\"device\":\"root.sg.d1\",\n"
+                  + "\"timestamp\":%d,\n"
+                  + "\"measurements\":[\"s1\"],\n"
+                  + "\"values\":[%f]\n"
+                  + "}",
+              System.currentTimeMillis(), random.nextDouble());
+      sb.append(payload).append(",");
+
+      // publish a json object
+      Thread.sleep(1);
+      connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    }
+    // publish a json array
+    sb.insert(0, "[");
+    sb.replace(sb.lastIndexOf(","), sb.length(), "]");
+    connection.publish("root.sg.d1.s1", sb.toString().getBytes(), 
QoS.AT_LEAST_ONCE, false);
+  }
+
+  // The database must be created in advance
+  private static void linePayloadFormatter(BlockingConnection connection) 
throws Exception {
+    // myTable,tag1=t1,tag2=t2 fieldKey1="1,2,3" 1740109006001
+    String payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 
1740109006001";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006002";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006003";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+    payload =
+        "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 
field1=\"fieldValue1\",field2=1i,field3=1u 1";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload = "test1,tag1=t1,tag2=t2  field4=2,field5=2i32,field6=2f 2";
+    connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload =
+        "test1,tag1=t1,tag2=t2  field7=t,field8=T,field9=true 3 \n "
+            + "test1,tag1=t1,tag2=t2  field7=f,field8=F,field9=FALSE 4";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
+            + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 
field4=2,field5=2i32,field6=2f 6";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+  }
+}
diff --git a/examples/pom.xml b/examples/pom.xml
index 13a72c5..ee75005 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -73,6 +73,12 @@
                 <module>spark-table</module>
             </modules>
         </profile>
+        <profile>
+          <id>with-mqtt</id>
+          <modules>
+            <module>mqtt-example</module>
+          </modules>
+        </profile>
     </profiles>
     <build>
         <pluginManagement>
diff --git a/mqtt-service/pom.xml b/mqtt-service/pom.xml
new file mode 100644
index 0000000..4c0ff84
--- /dev/null
+++ b/mqtt-service/pom.xml
@@ -0,0 +1,36 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.iotdb</groupId>
+    <artifactId>iotdb-extras-parent</artifactId>
+    <version>2.0.4-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>mqtt-service</artifactId>
+  <packaging>jar</packaging>
+
+  <name>untitled</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.iotdb</groupId>
+      <artifactId>iotdb-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tsfile</groupId>
+      <artifactId>common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/App.java 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/App.java
new file mode 100644
index 0000000..e755bae
--- /dev/null
+++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/App.java
@@ -0,0 +1,13 @@
+package org.apache.iotdb.protocol.mqtt;
+
+/**
+ * Hello world!
+ *
+ */
+public class App 
+{
+    public static void main( String[] args )
+    {
+        System.out.println( "Hello World!" );
+    }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/BrokerAuthenticator.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/BrokerAuthenticator.java
new file mode 100644
index 0000000..2a5c8d3
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/BrokerAuthenticator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt;
+
+import io.moquette.broker.security.IAuthenticator;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.tsfile.external.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The MQTT broker authenticator. */
+public class BrokerAuthenticator implements IAuthenticator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BrokerAuthenticator.class);
+
+  private final MqttConfig config;
+
+  public BrokerAuthenticator(MqttConfig config) {
+    this.config = config;
+  }
+
+  @Override
+  public boolean checkValid(String clientId, String username, byte[] password) 
{
+    if (StringUtils.isBlank(username) || password == null) {
+      return false;
+    }
+
+    // validate by logging in IoTDB
+    try (ISession ignored = new Session(config.getIotdbHost(), 
config.getIotdbPort(),
+        username, new String(password))) {
+      return true;
+    } catch (IoTDBConnectionException e) {
+      LOG.warn("Failed to connect to IoTDB server.", e);
+      return false;
+    }
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MPPPublishHandler.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MPPPublishHandler.java
new file mode 100644
index 0000000..e3c7aa0
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MPPPublishHandler.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt;
+
+import io.moquette.interception.AbstractInterceptHandler;
+import io.moquette.interception.messages.InterceptConnectMessage;
+import io.moquette.interception.messages.InterceptDisconnectMessage;
+import io.moquette.interception.messages.InterceptPublishMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.protocol.mqtt.msg.Message;
+import org.apache.iotdb.protocol.mqtt.msg.PayloadFormatManager;
+import org.apache.iotdb.protocol.mqtt.msg.PayloadFormatter;
+import org.apache.iotdb.protocol.mqtt.msg.TableMessage;
+import org.apache.iotdb.protocol.mqtt.msg.TreeMessage;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.MqttClientSession;
+import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.session.TableSessionBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** PublishHandler handle the messages from MQTT clients. */
+public class MPPPublishHandler extends AbstractInterceptHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MPPPublishHandler.class);
+
+  private final MqttConfig config;
+  private final ConcurrentHashMap<String, ISession> clientIdToSessionMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ITableSession> 
clientIdToTableSessionMap =
+      new ConcurrentHashMap<>();
+  private final PayloadFormatter payloadFormat;
+  private final boolean useTableInsert;
+
+  public MPPPublishHandler(MqttConfig config) {
+    this.config = config;
+    this.payloadFormat = 
config.getPayloadFormatManager().getPayloadFormat(config.getMqttPayloadFormatter());
+    useTableInsert = 
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
+  }
+
+  @Override
+  public String getID() {
+    return "iotdb-mqtt-broker-listener";
+  }
+
+  @Override
+  public void onConnect(InterceptConnectMessage msg) {
+    if (msg.getClientID() == null || msg.getClientID().trim().isEmpty()) {
+      LOG.error(
+          "Connection refused: client_id is missing or empty. A valid 
client_id is required to establish a connection.");
+      return;
+    }
+
+    if (useTableInsert) {
+      clientIdToTableSessionMap.computeIfAbsent(msg.getClientID(), (cId) -> {
+        TableSessionBuilder sessionBuilder = new TableSessionBuilder();
+        
sessionBuilder.nodeUrls(Collections.singletonList(config.getIotdbHost() + ":" + 
config.getIotdbPort()))
+            .username(msg.getUsername()).password(new 
String(msg.getPassword()));
+        return sessionBuilder.build();
+      })
+    }
+    if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
+      ISession session = new MqttClientSession(msg.getClientID());
+      sessionManager.login(
+          session,
+          msg.getUsername(),
+          new String(msg.getPassword()),
+          ZoneId.systemDefault().toString(),
+          TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
+          ClientVersion.V_1_0,
+          useTableInsert ? IClientSession.SqlDialect.TABLE : 
IClientSession.SqlDialect.TREE);
+      sessionManager.registerSessionForMqtt(session);
+      clientIdToSessionMap.put(msg.getClientID(), session);
+    }
+  }
+
+  @Override
+  public void onDisconnect(InterceptDisconnectMessage msg) {
+    MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
+    if (null != session) {
+      sessionManager.removeCurrSessionForMqtt(session);
+      sessionManager.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
+    }
+  }
+
+  @Override
+  public void onPublish(InterceptPublishMessage msg) {
+    try {
+      String clientId = msg.getClientID();
+      if (!clientIdToSessionMap.containsKey(clientId)) {
+        return;
+      }
+      MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
+      ByteBuf payload = msg.getPayload();
+      String topic = msg.getTopicName();
+
+      if (LOG.isDebugEnabled()) {
+        String username = msg.getUsername();
+        MqttQoS qos = msg.getQos();
+        LOG.debug(
+            "Receive publish message. clientId: {}, username: {}, qos: {}, 
topic: {}, payload: {}",
+            clientId,
+            username,
+            qos,
+            topic,
+            payload);
+      }
+
+      List<org.apache.iotdb.protocol.mqtt.Message> messages = 
payloadFormat.format(topic, payload);
+      if (messages == null) {
+        return;
+      }
+
+      for (Message message : messages) {
+        if (message == null) {
+          continue;
+        }
+        if (useTableInsert) {
+          insertTable((org.apache.iotdb.protocol.mqtt.TableMessage) message, 
session);
+        } else {
+          insertTree((org.apache.iotdb.protocol.mqtt.TreeMessage) message, 
session);
+        }
+      }
+    } catch (Throwable t) {
+      LOG.warn("onPublish execution exception, msg is [{}], error is ", msg, 
t);
+    } finally {
+      // release the payload of the message
+      super.onPublish(msg);
+    }
+  }
+
+  /** Inserting table using tablet */
+  private void insertTable(TableMessage message, MqttClientSession session) {
+    TSStatus tsStatus = null;
+    try {
+      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
+      InsertTabletStatement insertTabletStatement = 
constructInsertTabletStatement(message);
+      session.setDatabaseName(message.getDatabase().toLowerCase());
+      session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+      long queryId = sessionManager.requestQueryId();
+      SqlParser relationSqlParser = new SqlParser();
+      Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+      ExecutionResult result =
+          Coordinator.getInstance()
+              .executeForTableModel(
+                  insertTabletStatement,
+                  relationSqlParser,
+                  session,
+                  queryId,
+                  sessionManager.getSessionInfo(session),
+                  "",
+                  metadata,
+                  config.getQueryTimeoutThreshold());
+
+      tsStatus = result.status;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process result: {}", tsStatus);
+      }
+      if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          || tsStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        LOG.warn("mqtt line insert error , message = {}", tsStatus.message);
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "meet error when inserting database {}, table {}, tags {}, 
attributes {}, fields {}, at time {}, because ",
+          message.getDatabase(),
+          message.getTable(),
+          message.getTagKeys(),
+          message.getAttributeKeys(),
+          message.getFields(),
+          message.getTimestamp(),
+          e);
+    }
+  }
+
+  private InsertTabletStatement constructInsertTabletStatement(TableMessage 
message) {
+    InsertTabletStatement insertStatement = new InsertTabletStatement();
+    insertStatement.setDevicePath(new PartialPath(message.getTable(), false));
+    List<String> measurements =
+        Stream.of(message.getFields(), message.getTagKeys(), 
message.getAttributeKeys())
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    insertStatement.setMeasurements(measurements.toArray(new String[0]));
+    long[] timestamps = new long[] {message.getTimestamp()};
+    insertStatement.setTimes(timestamps);
+    int columnSize = measurements.size();
+    int rowSize = 1;
+
+    BitMap[] bitMaps = new BitMap[columnSize];
+    Object[] columns =
+        Stream.of(message.getValues(), message.getTagValues(), 
message.getAttributeValues())
+            .flatMap(List::stream)
+            .toArray(Object[]::new);
+    insertStatement.setColumns(columns);
+    insertStatement.setBitMaps(bitMaps);
+    insertStatement.setRowCount(rowSize);
+    insertStatement.setAligned(false);
+    insertStatement.setWriteToTable(true);
+    TSDataType[] dataTypes = new TSDataType[measurements.size()];
+    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[measurements.size()];
+    for (int i = 0; i < message.getFields().size(); i++) {
+      dataTypes[i] = message.getDataTypes().get(i);
+      columnCategories[i] = TsTableColumnCategory.FIELD;
+    }
+    for (int i = message.getFields().size();
+        i < message.getFields().size() + message.getTagKeys().size();
+        i++) {
+      dataTypes[i] = TSDataType.STRING;
+      columnCategories[i] = TsTableColumnCategory.TAG;
+    }
+    for (int i = message.getFields().size() + message.getTagKeys().size();
+        i
+            < message.getFields().size()
+                + message.getTagKeys().size()
+                + message.getAttributeKeys().size();
+        i++) {
+      dataTypes[i] = TSDataType.STRING;
+      columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
+    }
+    insertStatement.setDataTypes(dataTypes);
+    insertStatement.setColumnCategories(columnCategories);
+
+    return insertStatement;
+  }
+
+  private void insertTree(TreeMessage message, MqttClientSession session) {
+    TSStatus tsStatus = null;
+    try {
+      InsertRowStatement statement = new InsertRowStatement();
+      statement.setDevicePath(
+          
DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice()));
+      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
+      statement.setTime(message.getTimestamp());
+      statement.setMeasurements(message.getMeasurements().toArray(new 
String[0]));
+      if (message.getDataTypes() == null) {
+        statement.setDataTypes(new 
TSDataType[message.getMeasurements().size()]);
+        statement.setValues(message.getValues().toArray(new Object[0]));
+        statement.setNeedInferType(true);
+      } else {
+        List<TSDataType> dataTypes = message.getDataTypes();
+        List<String> values = message.getValues();
+        Object[] inferredValues = new Object[values.size()];
+        for (int i = 0; i < values.size(); ++i) {
+          inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), 
values.get(i));
+        }
+        statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+        statement.setValues(inferredValues);
+      }
+      statement.setAligned(false);
+
+      tsStatus =
+          AuthorityChecker.checkAuthority(
+              statement,
+              new TreeAccessCheckContext(
+                  session.getUserId(), session.getUsername(), 
session.getClientAddress()));
+      if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOG.warn(tsStatus.message);
+      } else {
+        long queryId = sessionManager.requestQueryId();
+        ExecutionResult result =
+            Coordinator.getInstance()
+                .executeForTreeModel(
+                    statement,
+                    queryId,
+                    sessionManager.getSessionInfo(session),
+                    "",
+                    partitionFetcher,
+                    schemaFetcher,
+                    config.getQueryTimeoutThreshold(),
+                    false);
+        tsStatus = result.status;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("process result: {}", tsStatus);
+        }
+        if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            || tsStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+          LOG.warn("mqtt json insert error , message = {}", tsStatus.message);
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "meet error when inserting device {}, measurements {}, at time {}, 
because ",
+          message.getDevice(),
+          message.getMeasurements(),
+          message.getTimestamp(),
+          e);
+    }
+  }
+
+  @Override
+  public void onSessionLoopError(Throwable throwable) {
+    // TODO: Implement something sensible here ...
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MQTTService.java 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MQTTService.java
new file mode 100644
index 0000000..3ec4d47
--- /dev/null
+++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MQTTService.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt;
+
+import io.moquette.BrokerConstants;
+import io.moquette.broker.Server;
+import io.moquette.broker.config.IConfig;
+import io.moquette.broker.config.MemoryConfig;
+import io.moquette.broker.security.IAuthenticator;
+import io.moquette.interception.InterceptHandler;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
+import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The IoTDB MQTT Service. */
+public class MQTTService implements IService {
+  private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class);
+  private final Server server = new Server();
+
+  private MQTTService() {}
+
+  @Override
+  public void start() {
+    startup();
+  }
+
+  @Override
+  public void stop() {
+    shutdown();
+  }
+
+  public void startup() {
+    IoTDBConfig iotDBConfig = IoTDBDescriptor.getInstance().getConfig();
+    IConfig config = createBrokerConfig(iotDBConfig);
+    List<InterceptHandler> handlers = new ArrayList<>(1);
+    handlers.add(new MPPPublishHandler(iotDBConfig));
+    IAuthenticator authenticator = new BrokerAuthenticator();
+
+    try {
+      server.startServer(config, handlers, null, authenticator, null);
+    } catch (IOException e) {
+      throw new RuntimeException("Exception while starting server", e);
+    }
+
+    LOG.info(
+        "Start MQTT service successfully, listening on ip {} port {}",
+        iotDBConfig.getMqttHost(),
+        iotDBConfig.getMqttPort());
+
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  LOG.info("Stopping IoTDB MQTT service...");
+                  shutdown();
+                  LOG.info("IoTDB MQTT service stopped.");
+                }));
+  }
+
+  private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) {
+    Properties properties = new Properties();
+    properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, 
iotDBConfig.getMqttHost());
+    properties.setProperty(
+        BrokerConstants.PORT_PROPERTY_NAME, 
String.valueOf(iotDBConfig.getMqttPort()));
+    properties.setProperty(
+        BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE,
+        String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
+    properties.setProperty(
+        BrokerConstants.DATA_PATH_PROPERTY_NAME, 
String.valueOf(iotDBConfig.getMqttDataPath()));
+    
properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, 
"true");
+    properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, 
"false");
+    
properties.setProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, 
"false");
+    properties.setProperty(
+        BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME,
+        String.valueOf(iotDBConfig.getMqttMaxMessageSize()));
+    return new MemoryConfig(properties);
+  }
+
+  public void shutdown() {
+    server.stopServer();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.MQTT_SERVICE;
+  }
+
+  public static MQTTService getInstance() {
+    return MQTTServiceHolder.INSTANCE;
+  }
+
+  private static class MQTTServiceHolder {
+
+    private static final MQTTService INSTANCE = new MQTTService();
+
+    private MQTTServiceHolder() {}
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConfig.java 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConfig.java
new file mode 100644
index 0000000..ff542a3
--- /dev/null
+++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConfig.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt;
+
+import static org.apache.iotdb.protocol.mqtt.MqttConstant.MQTT_FOLDER_NAME;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.conf.TrimProperties;
+import org.apache.iotdb.protocol.mqtt.msg.PayloadFormatManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttConfig {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MqttConfig.class);
+  private static final String DEFAULT_CONFIG_FILE_PATH = "mqtt.properties";
+
+  /** The mqtt service binding host. */
+  private String mqttHost = "127.0.0.1";
+
+  /** The mqtt service binding port. */
+  private int mqttPort = 1883;
+
+  /** The target IoTDB binding host. */
+  private String iotdbHost = "127.0.0.1";
+
+  /** The target IoTDB binding port. */
+  private int iotdbPort = 6667;
+
+  /** The handler pool size for handing the mqtt messages. */
+  private int mqttHandlerPoolSize = Math.max(1, 
Runtime.getRuntime().availableProcessors() >> 1);
+
+  /** The mqtt message payload formatter. */
+  private String mqttPayloadFormatter = "json";
+
+  /** The mqtt save data path */
+  private String mqttDataPath = "data/";
+
+  /** Max mqtt message size. Unit: byte */
+  private int mqttMaxMessageSize = 1048576;
+
+  /** Trigger MQTT forward pool size */
+  private int triggerForwardMQTTPoolSize = 4;
+
+  /** External lib directory for MQTT, stores user-uploaded JAR files */
+  private String mqttDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + MQTT_FOLDER_NAME;
+
+  private PayloadFormatManager payloadFormatManager;
+
+  private void init() {
+    this.payloadFormatManager = new PayloadFormatManager(this);
+  }
+
+  public PayloadFormatManager getPayloadFormatManager() {
+    return payloadFormatManager;
+  }
+
+  public String getMqttHost() {
+    return mqttHost;
+  }
+
+  public void setMqttHost(String mqttHost) {
+    this.mqttHost = mqttHost;
+  }
+
+  public int getMqttPort() {
+    return mqttPort;
+  }
+
+  public void setMqttPort(int mqttPort) {
+    this.mqttPort = mqttPort;
+  }
+
+  public int getMqttHandlerPoolSize() {
+    return mqttHandlerPoolSize;
+  }
+
+  public void setMqttHandlerPoolSize(int mqttHandlerPoolSize) {
+    this.mqttHandlerPoolSize = mqttHandlerPoolSize;
+  }
+
+  public String getMqttPayloadFormatter() {
+    return mqttPayloadFormatter;
+  }
+
+  public void setMqttPayloadFormatter(String mqttPayloadFormatter) {
+    this.mqttPayloadFormatter = mqttPayloadFormatter;
+  }
+
+  public String getMqttDataPath() {
+    return mqttDataPath;
+  }
+
+  public void setMqttDataPath(String mqttDataPath) {
+    this.mqttDataPath = mqttDataPath;
+  }
+
+  public int getMqttMaxMessageSize() {
+    return mqttMaxMessageSize;
+  }
+
+  public void setMqttMaxMessageSize(int mqttMaxMessageSize) {
+    this.mqttMaxMessageSize = mqttMaxMessageSize;
+  }
+
+  public int getTriggerForwardMQTTPoolSize() {
+    return triggerForwardMQTTPoolSize;
+  }
+
+  public void setTriggerForwardMQTTPoolSize(int triggerForwardMQTTPoolSize) {
+    this.triggerForwardMQTTPoolSize = triggerForwardMQTTPoolSize;
+  }
+
+  public String getMqttDir() {
+    return mqttDir;
+  }
+
+  public void setMqttDir(String mqttDir) {
+    this.mqttDir = mqttDir;
+  }
+
+  public String getIotdbHost() {
+    return iotdbHost;
+  }
+
+  public void setIotdbHost(String iotdbHost) {
+    this.iotdbHost = iotdbHost;
+  }
+
+  public int getIotdbPort() {
+    return iotdbPort;
+  }
+
+  public void setIotdbPort(int iotdbPort) {
+    this.iotdbPort = iotdbPort;
+  }
+
+  public static MqttConfig fromFile(String configFilePath) {
+    TrimProperties commonProperties = new TrimProperties();
+    if (configFilePath == null) {
+      LOGGER.warn("configFilePath not set, use the default config file.");
+      configFilePath = DEFAULT_CONFIG_FILE_PATH;
+    }
+
+    try (InputStream inputStream = 
Files.newInputStream(Paths.get(configFilePath))) {
+      LOGGER.info("Start to read config file {}", configFilePath);
+      Properties properties = new Properties();
+      properties.load(new InputStreamReader(inputStream, 
StandardCharsets.UTF_8));
+      commonProperties.putAll(properties);
+      return fromProperties(commonProperties);
+    } catch (FileNotFoundException e) {
+      LOGGER.error("Fail to find config file {}, reject DataNode startup.", 
configFilePath, e);
+      System.exit(-1);
+    } catch (IOException e) {
+      LOGGER.error("Cannot load config file, reject DataNode startup.", e);
+      System.exit(-1);
+    } catch (Exception e) {
+      LOGGER.error("Incorrect format in config file, reject DataNode 
startup.", e);
+      System.exit(-1);
+    }
+    return null;
+  }
+
+  public static MqttConfig fromProperties(TrimProperties properties) {
+    MqttConfig conf = new MqttConfig();
+    conf.setMqttDir(properties.getProperty("mqtt_root_dir", 
conf.getMqttDir()));
+
+    if (properties.getProperty(MqttConstant.MQTT_HOST_NAME) != null) {
+      conf.setMqttHost(properties.getProperty(MqttConstant.MQTT_HOST_NAME));
+    }
+
+    if (properties.getProperty(MqttConstant.MQTT_PORT_NAME) != null) {
+      conf.setMqttPort(
+          
Integer.parseInt(properties.getProperty(MqttConstant.MQTT_PORT_NAME)));
+    }
+
+    if (properties.getProperty(MqttConstant.MQTT_HANDLER_POOL_SIZE_NAME) != 
null) {
+      conf.setMqttHandlerPoolSize(
+          Integer.parseInt(
+              
properties.getProperty(MqttConstant.MQTT_HANDLER_POOL_SIZE_NAME)));
+    }
+
+    if (properties.getProperty(MqttConstant.MQTT_PAYLOAD_FORMATTER_NAME) != 
null) {
+      conf.setMqttPayloadFormatter(
+          properties.getProperty(MqttConstant.MQTT_PAYLOAD_FORMATTER_NAME));
+    }
+
+    if (properties.getProperty(MqttConstant.MQTT_DATA_PATH) != null) {
+      
conf.setMqttDataPath(properties.getProperty(MqttConstant.MQTT_DATA_PATH));
+    }
+
+    if (properties.getProperty(MqttConstant.MQTT_MAX_MESSAGE_SIZE) != null) {
+      conf.setMqttMaxMessageSize(
+          
Integer.parseInt(properties.getProperty(MqttConstant.MQTT_MAX_MESSAGE_SIZE)));
+    }
+
+    if (properties.getProperty(MqttConstant.IOTDB_HOST_NAME) != null) {
+      conf.setIotdbHost(
+          properties.getProperty(MqttConstant.IOTDB_HOST_NAME));
+    }
+
+    if (properties.getProperty(MqttConstant.IOTDB_CLIENT_RPC_PORT_NAME) != 
null) {
+      conf.setIotdbPort(
+          
Integer.parseInt(properties.getProperty(MqttConstant.IOTDB_CLIENT_RPC_PORT_NAME)));
+    }
+
+    conf.init();
+    return conf;
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConstant.java 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConstant.java
new file mode 100644
index 0000000..a5d176f
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConstant.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt;
+
+public class MqttConstant {
+  // mqtt
+  public static final String MQTT_HOST_NAME = "mqtt_host";
+  public static final String MQTT_PORT_NAME = "mqtt_port";
+  public static final String IOTDB_HOST_NAME = "iotdb_host";
+  public static final String IOTDB_CLIENT_RPC_PORT_NAME = 
"iotdb_client_rpc_port";
+  public static final String MQTT_HANDLER_POOL_SIZE_NAME = 
"mqtt_handler_pool_size";
+  public static final String MQTT_PAYLOAD_FORMATTER_NAME = 
"mqtt_payload_formatter";
+  public static final String MQTT_DATA_PATH = "mqtt_data_path";
+  public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size";
+  public static final String MQTT_FOLDER_NAME = "mqtt";
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/JSONPayloadFormatter.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/JSONPayloadFormatter.java
new file mode 100644
index 0000000..059add6
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/JSONPayloadFormatter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt.msg;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+import io.netty.buffer.ByteBuf;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.external.commons.lang3.NotImplementedException;
+
+/**
+ * The JSON payload formatter. two json format supported: { 
"device":"root.sg.d1",
+ * "timestamp":1586076045524, "measurements":["s1","s2"], 
"values":[0.530635,0.530635] }
+ *
+ * <p>{ "device":"root.sg.d1", "timestamps":[1586076045524,1586076065526],
+ * "measurements":["s1","s2"], "values":[[0.530635,0.530635], 
[0.530655,0.530695]] }
+ */
+public class JSONPayloadFormatter implements 
org.apache.iotdb.db.protocol.mqtt.PayloadFormatter {
+  private static final String JSON_KEY_DEVICE = "device";
+  private static final String JSON_KEY_TIMESTAMP = "timestamp";
+  private static final String JSON_KEY_TIMESTAMPS = "timestamps";
+  private static final String JSON_KEY_MEASUREMENTS = "measurements";
+  private static final String JSON_KEY_VALUES = "values";
+  private static final String JSON_KEY_DATATYPE = "datatypes";
+  private static final Gson GSON = new GsonBuilder().create();
+
+  @Override
+  public List<Message> format(String topic, ByteBuf payload) {
+    if (payload == null) {
+      return new ArrayList<>();
+    }
+    String txt = payload.toString(StandardCharsets.UTF_8);
+    JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
+    if (jsonElement.isJsonObject()) {
+      JsonObject jsonObject = jsonElement.getAsJsonObject();
+      if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) {
+        return formatJson(jsonObject);
+      }
+      if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) {
+        return formatBatchJson(jsonObject);
+      }
+    } else if (jsonElement.isJsonArray()) {
+      JsonArray jsonArray = jsonElement.getAsJsonArray();
+      List<Message> messages = new ArrayList<>();
+      for (JsonElement element : jsonArray) {
+        JsonObject jsonObject = element.getAsJsonObject();
+        if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) {
+          messages.addAll(formatJson(jsonObject));
+        }
+        if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) {
+          messages.addAll(formatBatchJson(jsonObject));
+        }
+      }
+      return messages;
+    }
+    throw new JsonParseException("payload is invalidate");
+  }
+
+  @Override
+  @Deprecated
+  public List<Message> format(ByteBuf payload) {
+    throw new NotImplementedException();
+  }
+
+  private List<Message> formatJson(JsonObject jsonObject) {
+    org.apache.iotdb.db.protocol.mqtt.TreeMessage message = new 
org.apache.iotdb.db.protocol.mqtt.TreeMessage();
+    message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());
+    message.setTimestamp(jsonObject.get(JSON_KEY_TIMESTAMP).getAsLong());
+    message.setMeasurements(
+        GSON.fromJson(
+            jsonObject.get(JSON_KEY_MEASUREMENTS), new 
TypeToken<List<String>>() {}.getType()));
+    message.setValues(
+        GSON.fromJson(jsonObject.get(JSON_KEY_VALUES), new 
TypeToken<List<String>>() {}.getType()));
+    if (jsonObject.has(JSON_KEY_DATATYPE)) {
+      message.setDataTypes(
+          GSON.fromJson(
+              jsonObject.get(JSON_KEY_DATATYPE), new 
TypeToken<List<TSDataType>>() {}.getType()));
+    }
+    return Lists.newArrayList(message);
+  }
+
+  private List<Message> formatBatchJson(JsonObject jsonObject) {
+    String device = jsonObject.get(JSON_KEY_DEVICE).getAsString();
+    List<String> measurements =
+        GSON.fromJson(
+            jsonObject.getAsJsonArray(JSON_KEY_MEASUREMENTS),
+            new TypeToken<List<String>>() {}.getType());
+    List<Long> timestamps =
+        GSON.fromJson(
+            jsonObject.get(JSON_KEY_TIMESTAMPS), new TypeToken<List<Long>>() 
{}.getType());
+    List<List<String>> values =
+        GSON.fromJson(
+            jsonObject.get(JSON_KEY_VALUES), new 
TypeToken<List<List<String>>>() {}.getType());
+    List<TSDataType> types =
+        jsonObject.has(JSON_KEY_DATATYPE)
+            ? GSON.fromJson(
+                jsonObject.get(JSON_KEY_DATATYPE), new 
TypeToken<List<TSDataType>>() {}.getType())
+            : null;
+
+    List<Message> ret = new ArrayList<>(timestamps.size());
+    for (int i = 0; i < timestamps.size(); i++) {
+      org.apache.iotdb.db.protocol.mqtt.TreeMessage message = new 
TreeMessage();
+      message.setDevice(device);
+      message.setTimestamp(timestamps.get(i));
+      message.setMeasurements(measurements);
+      message.setValues(values.get(i));
+      message.setDataTypes(types);
+      ret.add(message);
+    }
+    return ret;
+  }
+
+  @Override
+  public String getName() {
+    return "json";
+  }
+
+  @Override
+  public String getType() {
+    return PayloadFormatter.TREE_TYPE;
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/LinePayloadFormatter.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/LinePayloadFormatter.java
new file mode 100644
index 0000000..5fd57f4
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/LinePayloadFormatter.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt.msg;
+
+import io.netty.buffer.ByteBuf;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TableMessage;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.external.commons.lang3.NotImplementedException;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Line payload formatter. myTable,tag1=value1,tag2=value2 
attr1=value1,attr2=value2
+ * fieldKey="fieldValue" 1740109006000 \n myTable,tag1=value1,tag2=value2 
fieldKey="fieldValue"
+ * 1740109006000
+ */
+public class LinePayloadFormatter implements 
org.apache.iotdb.db.protocol.mqtt.PayloadFormatter {
+
+  private static final Logger log = 
LoggerFactory.getLogger(LinePayloadFormatter.class);
+
+  /*
+  Regular expression matching line protocol ,the attributes field is not 
required
+  */
+  private static final String REGEX =
+      "(?<table>\\w+)(,(?<tags>[^ ]+))?(\\s+(?<attributes>[^ 
]+))?\\s+(?<fields>[^ ]+)\\s+(?<timestamp>\\d+)";
+  private static final String COMMA = ",";
+  private static final String WELL = "#";
+  private static final String LINE_BREAK = "\n";
+  private static final String EQUAL = "=";
+  private static final String TABLE = "table";
+  private static final String TAGS = "tags";
+  private static final String ATTRIBUTES = "attributes";
+  private static final String FIELDS = "fields";
+  private static final String TIMESTAMP = "timestamp";
+  private static final String NULL = "null";
+
+  private final Pattern pattern;
+
+  public LinePayloadFormatter() {
+    pattern = Pattern.compile(REGEX);
+  }
+
+  @Override
+  public List<org.apache.iotdb.db.protocol.mqtt.Message> format(String topic, 
ByteBuf payload) {
+    List<org.apache.iotdb.db.protocol.mqtt.Message> messages = new 
ArrayList<>();
+    if (payload == null) {
+      return messages;
+    }
+
+    String txt = payload.toString(StandardCharsets.UTF_8);
+    String[] lines = txt.split(LINE_BREAK);
+    // '/' previously defined as a database name
+    String database = !topic.contains("/") ? topic : topic.substring(0, 
topic.indexOf("/"));
+    for (String line : lines) {
+      if (line.trim().startsWith(WELL)) {
+        continue;
+      }
+      org.apache.iotdb.db.protocol.mqtt.TableMessage message = new 
org.apache.iotdb.db.protocol.mqtt.TableMessage();
+      try {
+        Matcher matcher = pattern.matcher(line.trim());
+        if (!matcher.matches()) {
+          log.warn("Invalid line protocol format ,line is {}", line);
+          continue;
+        }
+
+        // Parsing Database Name
+        message.setDatabase((database));
+
+        // Parsing Table Names
+        message.setTable(matcher.group(TABLE));
+
+        // Parsing Tags
+        if (!setTags(matcher, message)) {
+          log.warn("The tags is error , line is {}", line);
+          continue;
+        }
+
+        // Parsing Attributes
+        if (!setAttributes(matcher, message)) {
+          log.warn("The attributes is error , line is {}", line);
+          continue;
+        }
+
+        // Parsing Fields
+        if (!setFields(matcher, message)) {
+          log.warn("The fields is error , line is {}", line);
+          continue;
+        }
+
+        // Parsing timestamp
+        if (!setTimestamp(matcher, message)) {
+          log.warn("The timestamp is error , line is {}", line);
+          continue;
+        }
+
+        messages.add(message);
+      } catch (Exception e) {
+        log.warn(
+            "The line pattern parsing fails, and the failed line message is {} 
,exception is",
+            line,
+            e);
+      }
+    }
+    return messages;
+  }
+
+  @Override
+  @Deprecated
+  public List<Message> format(ByteBuf payload) {
+    throw new NotImplementedException();
+  }
+
+  private boolean setTags(Matcher matcher, 
org.apache.iotdb.db.protocol.mqtt.TableMessage message) {
+    List<String> tagKeys = new ArrayList<>();
+    List<Object> tagValues = new ArrayList<>();
+    String tagsGroup = matcher.group(TAGS);
+    if (tagsGroup != null && !tagsGroup.isEmpty()) {
+      String[] tagPairs = tagsGroup.split(COMMA);
+      for (String tagPair : tagPairs) {
+        if (!tagPair.isEmpty()) {
+          String[] keyValue = tagPair.split(EQUAL);
+          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+            tagKeys.add(keyValue[0]);
+            tagValues.add(new Binary[] {new 
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
+          }
+        }
+      }
+    }
+    if (!tagKeys.isEmpty() && !tagValues.isEmpty() && tagKeys.size() == 
tagValues.size()) {
+      message.setTagKeys(tagKeys);
+      message.setTagValues(tagValues);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private boolean setAttributes(Matcher matcher, 
org.apache.iotdb.db.protocol.mqtt.TableMessage message) {
+    List<String> attributeKeys = new ArrayList<>();
+    List<Object> attributeValues = new ArrayList<>();
+    String attributesGroup = matcher.group(ATTRIBUTES);
+    if (attributesGroup != null && !attributesGroup.isEmpty()) {
+      String[] attributePairs = attributesGroup.split(COMMA);
+      for (String attributePair : attributePairs) {
+        if (!attributePair.isEmpty()) {
+          String[] keyValue = attributePair.split(EQUAL);
+          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+            attributeKeys.add(keyValue[0]);
+            attributeValues.add(
+                new Binary[] {new 
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
+          }
+        }
+      }
+    }
+    if (attributeKeys.size() == attributeValues.size()) {
+      message.setAttributeKeys(attributeKeys);
+      message.setAttributeValues(attributeValues);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private boolean setFields(Matcher matcher, 
org.apache.iotdb.db.protocol.mqtt.TableMessage message) {
+    List<String> fields = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<Object> values = new ArrayList<>();
+    String fieldsGroup = matcher.group(FIELDS);
+    if (fieldsGroup != null && !fieldsGroup.isEmpty()) {
+      String[] fieldPairs = splitFieldPairs(fieldsGroup);
+      for (String fieldPair : fieldPairs) {
+        if (!fieldPair.isEmpty()) {
+          String[] keyValue = fieldPair.split(EQUAL);
+          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+            fields.add(keyValue[0]);
+            Pair<TSDataType, Object> typeAndValue = analyticValue(keyValue[1]);
+            values.add(typeAndValue.getRight());
+            dataTypes.add(typeAndValue.getLeft());
+          }
+        }
+      }
+    }
+    if (!fields.isEmpty() && !values.isEmpty() && fields.size() == 
values.size()) {
+      message.setFields(fields);
+      message.setDataTypes(dataTypes);
+      message.setValues(values);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private String[] splitFieldPairs(String fieldsGroup) {
+
+    if (fieldsGroup == null || fieldsGroup.isEmpty()) return new String[0];
+    Matcher m = 
Pattern.compile("\\w+=\"[^\"]*\"|\\w+=[^,]*").matcher(fieldsGroup);
+    Stream.Builder<String> builder = Stream.builder();
+
+    while (m.find()) builder.add(m.group());
+    return builder.build().toArray(String[]::new);
+  }
+
+  private Pair<TSDataType, Object> analyticValue(String value) {
+    if (value.startsWith("\"") && value.endsWith("\"")) {
+      // String
+      return new Pair<>(
+          TSDataType.TEXT,
+          new Binary[] {
+            new Binary(value.substring(1, value.length() - 
1).getBytes(StandardCharsets.UTF_8))
+          });
+    } else if (value.equalsIgnoreCase("t")
+        || value.equalsIgnoreCase("true")
+        || value.equalsIgnoreCase("f")
+        || value.equalsIgnoreCase("false")) {
+      // boolean
+      return new Pair<>(
+          TSDataType.BOOLEAN,
+          new boolean[] {value.equalsIgnoreCase("t") || 
value.equalsIgnoreCase("true")});
+    } else if (value.endsWith("f")) {
+      // float
+      return new Pair<>(
+          TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, 
value.length() - 1))});
+    } else if (value.endsWith("i32")) {
+      // int
+      return new Pair<>(
+          TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, 
value.length() - 3))});
+    } else if (value.endsWith("u") || value.endsWith("i")) {
+      // long
+      return new Pair<>(
+          TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, 
value.length() - 1))});
+    } else {
+      // double
+      return new Pair<>(TSDataType.DOUBLE, new double[] 
{Double.parseDouble(value)});
+    }
+  }
+
+  private boolean setTimestamp(Matcher matcher, TableMessage message) {
+    String timestampGroup = matcher.group(TIMESTAMP);
+    if (timestampGroup != null && !timestampGroup.isEmpty()) {
+      message.setTimestamp(Long.parseLong(timestampGroup));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "line";
+  }
+
+  @Override
+  public String getType() {
+    return PayloadFormatter.TABLE_TYPE;
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/Message.java 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/Message.java
new file mode 100644
index 0000000..26f0618
--- /dev/null
+++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/Message.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt.msg;
+
+/** Generic parsing of messages */
+public class Message {
+
+  protected Long timestamp;
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp;
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatManager.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatManager.java
new file mode 100644
index 0000000..cceb5f6
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatManager.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt.msg;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.protocol.mqtt.MqttConfig;
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** PayloadFormatManager loads payload formatter from SPI services. */
+public class PayloadFormatManager {
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(PayloadFormatManager.class);
+
+  // The dir saving MQTT payload plugin .jar files
+  private static String mqttDir;
+  // Map: formatterName => PayloadFormatter
+  private final Map<String, PayloadFormatter> mqttPayloadPluginMap = new 
HashMap<>();
+
+  private final MqttConfig config;
+  public PayloadFormatManager(MqttConfig config) {
+    this.config = config;
+    init();
+  }
+
+  private void init() {
+    mqttDir = config.getMqttDir();
+    logger.info("mqttDir: {}", mqttDir);
+
+    try {
+      makeMqttPluginDir();
+      buildMqttPluginMap();
+    } catch (IOException e) {
+      logger.error("MQTT PayloadFormatManager init() error.", e);
+    }
+  }
+
+  public PayloadFormatter getPayloadFormat(String name) {
+    PayloadFormatter formatter = mqttPayloadPluginMap.get(name);
+    Preconditions.checkArgument(formatter != null, "Unknown payload format 
named: " + name);
+    return formatter;
+  }
+
+  private void makeMqttPluginDir() throws IOException {
+    File file = SystemFileFactory.INSTANCE.getFile(mqttDir);
+    if (file.exists() && file.isDirectory()) {
+      return;
+    }
+    FileUtils.forceMkdir(file);
+  }
+
+  private void buildMqttPluginMap() throws IOException {
+    ServiceLoader<PayloadFormatter> payloadFormatters = ServiceLoader.load(
+        PayloadFormatter.class);
+    for (PayloadFormatter formatter : payloadFormatters) {
+      if (formatter == null) {
+        logger.error("PayloadFormatManager(), formatter is null.");
+        continue;
+      }
+
+      String pluginName = formatter.getName();
+      mqttPayloadPluginMap.put(pluginName, formatter);
+      logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", 
pluginName);
+    }
+
+    URL[] jarURLs = getPluginJarURLs(mqttDir);
+    logger.debug("MQTT Plugin jarURLs: {}", Arrays.toString(jarURLs));
+
+    for (URL jarUrl : jarURLs) {
+      ClassLoader classLoader = new URLClassLoader(new URL[] {jarUrl});
+
+      // Use SPI to get all plugins' class
+      ServiceLoader<PayloadFormatter> payloadFormatters2 =
+          ServiceLoader.load(PayloadFormatter.class, classLoader);
+
+      for (PayloadFormatter formatter : payloadFormatters2) {
+        if (formatter == null) {
+          logger.error("PayloadFormatManager(), formatter is null.");
+          continue;
+        }
+
+        String pluginName = formatter.getName();
+        if (mqttPayloadPluginMap.containsKey(pluginName)) {
+          continue;
+        }
+        mqttPayloadPluginMap.put(pluginName, formatter);
+        logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", 
pluginName);
+      }
+    }
+  }
+
+  /**
+   * get all jar files in the given folder
+   *
+   * @param folderPath
+   * @return all jar files' URL
+   * @throws IOException
+   */
+  private static URL[] getPluginJarURLs(String folderPath) throws IOException {
+    HashSet<File> fileSet =
+        new HashSet<>(
+            org.apache.tsfile.external.commons.io.FileUtils.listFiles(
+                SystemFileFactory.INSTANCE.getFile(folderPath), new String[] 
{"jar"}, true));
+    return 
org.apache.tsfile.external.commons.io.FileUtils.toURLs(fileSet.toArray(new 
File[0]));
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatter.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatter.java
new file mode 100644
index 0000000..881beb0
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.protocol.mqtt.msg;
+
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.Message;
+
+/**
+ * PayloadFormatter format the payload to the messages.
+ *
+ * <p>This is a SPI interface.
+ *
+ * @see JSONPayloadFormatter
+ */
+public interface PayloadFormatter {
+
+  public static final String TREE_TYPE = "tree";
+  public static final String TABLE_TYPE = "table";
+
+  /**
+   * format a payload to a list of messages
+   *
+   * @param payload
+   * @return
+   */
+  @Deprecated
+  List<Message> format(ByteBuf payload);
+
+  /**
+   * format a payload of a topic to a list of messages
+   *
+   * @param topic
+   * @param payload
+   * @return
+   */
+  default List<Message> format(String topic, ByteBuf payload) {
+    return format(payload);
+  }
+
+  /**
+   * get the formatter name
+   *
+   * @return
+   */
+  String getName();
+
+  String getType();
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TableMessage.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TableMessage.java
new file mode 100644
index 0000000..90f2e32
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TableMessage.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.protocol.mqtt.msg;
+
+import java.util.List;
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.tsfile.enums.TSDataType;
+
+/** Message parsing into a table */
+public class TableMessage extends Message {
+
+  private String database;
+
+  private String table;
+
+  private List<String> tagKeys;
+
+  private List<Object> tagValues;
+
+  private List<String> attributeKeys;
+
+  private List<Object> attributeValues;
+
+  private List<String> fields;
+
+  private List<TSDataType> dataTypes;
+
+  private List<Object> values;
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public List<String> getTagKeys() {
+    return tagKeys;
+  }
+
+  public void setTagKeys(List<String> tagKeys) {
+    this.tagKeys = tagKeys;
+  }
+
+  public List<Object> getTagValues() {
+    return tagValues;
+  }
+
+  public void setTagValues(List<Object> tagValues) {
+    this.tagValues = tagValues;
+  }
+
+  public List<String> getAttributeKeys() {
+    return attributeKeys;
+  }
+
+  public void setAttributeKeys(List<String> attributeKeys) {
+    this.attributeKeys = attributeKeys;
+  }
+
+  public List<Object> getAttributeValues() {
+    return attributeValues;
+  }
+
+  public void setAttributeValues(List<Object> attributeValues) {
+    this.attributeValues = attributeValues;
+  }
+
+  public List<String> getFields() {
+    return fields;
+  }
+
+  public void setFields(List<String> fields) {
+    this.fields = fields;
+  }
+
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public List<Object> getValues() {
+    return values;
+  }
+
+  public void setValues(List<Object> values) {
+    this.values = values;
+  }
+
+  @Override
+  public String toString() {
+    return "TableMessage{"
+        + "database='"
+        + database
+        + '\''
+        + ", table='"
+        + table
+        + '\''
+        + ", tagKeys="
+        + tagKeys
+        + ", tagValues="
+        + tagValues
+        + ", attributeKeys="
+        + attributeKeys
+        + ", attributeValues="
+        + attributeValues
+        + ", fields="
+        + fields
+        + ", dataTypes="
+        + dataTypes
+        + ", values="
+        + values
+        + ", timestamp="
+        + timestamp
+        + '}';
+  }
+}
diff --git 
a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TreeMessage.java
 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TreeMessage.java
new file mode 100644
index 0000000..ee514eb
--- /dev/null
+++ 
b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TreeMessage.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.protocol.mqtt.msg;
+
+import java.util.List;
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.tsfile.enums.TSDataType;
+
+/** Message parsing into a tree */
+public class TreeMessage extends Message {
+  private String device;
+  private List<String> measurements;
+  private List<TSDataType> dataTypes;
+  private List<String> values;
+
+  public String getDevice() {
+    return device;
+  }
+
+  public void setDevice(String device) {
+    this.device = device;
+  }
+
+  public List<String> getMeasurements() {
+    return measurements;
+  }
+
+  public void setMeasurements(List<String> measurements) {
+    this.measurements = measurements;
+  }
+
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public List<String> getValues() {
+    return values;
+  }
+
+  public void setValues(List<String> values) {
+    this.values = values;
+  }
+
+  @Override
+  public String toString() {
+    return "Message{"
+        + "device='"
+        + device
+        + '\''
+        + ", timestamp="
+        + super.timestamp
+        + ", measurements="
+        + measurements
+        + ", values="
+        + values
+        + '}';
+  }
+}
diff --git a/mqtt-service/src/main/resources/mqtt.properties 
b/mqtt-service/src/main/resources/mqtt.properties
new file mode 100644
index 0000000..a27a026
--- /dev/null
+++ b/mqtt-service/src/main/resources/mqtt.properties
@@ -0,0 +1,35 @@
+####################
+### MQTT Broker Configuration
+####################
+
+# whether to enable the mqtt service.
+# effectiveMode: restart
+# Datatype: boolean
+enable_mqtt_service=false
+
+# the mqtt service binding host.
+# effectiveMode: restart
+# Datatype: String
+mqtt_host=127.0.0.1
+
+# the mqtt service binding port.
+# effectiveMode: restart
+# Datatype: int
+mqtt_port=1883
+
+# the handler pool size for handing the mqtt messages.
+# effectiveMode: restart
+# Datatype: int
+mqtt_handler_pool_size=1
+
+# the mqtt message payload formatter.
+# effectiveMode: restart
+# Options: [json, line]
+# The built-in json only supports tree models, and the line only supports 
table models.
+# Datatype: String
+mqtt_payload_formatter=json
+
+# max length of mqtt message in byte
+# effectiveMode: restart
+# Datatype: int
+mqtt_max_message_size=1048576
\ No newline at end of file
diff --git a/mqtt-service/src/test/java/org/apache/iotdb/AppTest.java 
b/mqtt-service/src/test/java/org/apache/iotdb/AppTest.java
new file mode 100644
index 0000000..3ea537e
--- /dev/null
+++ b/mqtt-service/src/test/java/org/apache/iotdb/AppTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+{
+
+}
diff --git 
a/mqtt-service/src/test/java/org/apache/iotdb/BrokerAuthenticatorTest.java 
b/mqtt-service/src/test/java/org/apache/iotdb/BrokerAuthenticatorTest.java
new file mode 100644
index 0000000..8e4ace8
--- /dev/null
+++ b/mqtt-service/src/test/java/org/apache/iotdb/BrokerAuthenticatorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BrokerAuthenticatorTest {
+
+  @Before
+  public void before() {
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void after() throws IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void checkValid() {
+    // In the previous implementation, the datanode will init a root file,
+    // but now checkuser operation needs to link to confignode.
+    //    BrokerAuthenticator authenticator = new BrokerAuthenticator();
+    //    assertTrue(authenticator.checkValid(null, "root", 
"root".getBytes()));
+    //    assertFalse(authenticator.checkValid(null, "", "foo".getBytes()));
+    //    assertFalse(authenticator.checkValid(null, "root", null));
+    //    assertFalse(authenticator.checkValid(null, "foo", "foo".getBytes()));
+  }
+}
diff --git 
a/mqtt-service/src/test/java/org/apache/iotdb/JSONPayloadFormatterTest.java 
b/mqtt-service/src/test/java/org/apache/iotdb/JSONPayloadFormatterTest.java
new file mode 100644
index 0000000..324852c
--- /dev/null
+++ b/mqtt-service/src/test/java/org/apache/iotdb/JSONPayloadFormatterTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb;
+
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
+import org.junit.Test;
+
+public class JSONPayloadFormatterTest {
+
+  @Test
+  public void formatJson() {
+    String payload =
+        " {\n"
+            + "      \"device\":\"root.sg.d1\",\n"
+            + "      \"timestamp\":1586076045524,\n"
+            + "      \"measurements\":[\"s1\",\"s2\"],\n"
+            + "      \"values\":[0.530635,0.530635]\n"
+            + " }";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
+
+    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0);
+
+    assertEquals("root.sg.d1", message.getDevice());
+    assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
+    assertEquals("s1", message.getMeasurements().get(0));
+    assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0);
+  }
+
+  @Test
+  public void formatBatchJson() {
+    String payload =
+        " {\n"
+            + "      \"device\":\"root.sg.d1\",\n"
+            + "      \"timestamps\":[1586076045524,1586076065526],\n"
+            + "      \"measurements\":[\"s1\",\"s2\"],\n"
+            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
+            + "  }";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
+
+    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
+
+    assertEquals("root.sg.d1", message.getDevice());
+    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
+    assertEquals("s2", message.getMeasurements().get(1));
+    assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
+  }
+
+  @Test
+  public void formatJsonArray() {
+    String payload =
+        " [\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d1\",\n"
+            + "      \"timestamp\":1586076045524,\n"
+            + "      \"measurements\":[\"s1\",\"s2\"],\n"
+            + "      \"values\":[0.530635,0.530635]\n"
+            + "  },\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d2\",\n"
+            + "      \"timestamp\":1586076065526,\n"
+            + "      \"measurements\":[\"s3\",\"s4\"],\n"
+            + "      \"values\":[0.530655,0.530655]\n"
+            + "  }\n"
+            + "]";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
+
+    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
+
+    assertEquals("root.sg.d2", message.getDevice());
+    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
+    assertEquals("s3", message.getMeasurements().get(0));
+    assertEquals(0.530655D, Double.parseDouble(message.getValues().get(0)), 0);
+  }
+
+  @Test
+  public void formatBatchJsonArray() {
+    String payload =
+        "[\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d1\",\n"
+            + "      \"timestamps\":[1586076045524,1586076065526],\n"
+            + "      \"measurements\":[\"s1\",\"s2\"],\n"
+            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
+            + "  },\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d2\",\n"
+            + "      \"timestamps\":[1586076045524,1586076065526],\n"
+            + "      \"measurements\":[\"s3\",\"s4\"],\n"
+            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
+            + "  }"
+            + "]";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
+
+    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3);
+
+    assertEquals("root.sg.d2", message.getDevice());
+    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
+    assertEquals("s4", message.getMeasurements().get(1));
+    assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
+  }
+}
diff --git 
a/mqtt-service/src/test/java/org/apache/iotdb/LinePayloadFormatterTest.java 
b/mqtt-service/src/test/java/org/apache/iotdb/LinePayloadFormatterTest.java
new file mode 100644
index 0000000..96e055b
--- /dev/null
+++ b/mqtt-service/src/test/java/org/apache/iotdb/LinePayloadFormatterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.iotdb.db.protocol.mqtt.TableMessage;
+import org.apache.tsfile.utils.Binary;
+import org.junit.Test;
+
+public class LinePayloadFormatterTest {
+
+  @Test
+  public void formatLine() {
+    String payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f
 1";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
+
+    LinePayloadFormatter formatter = new LinePayloadFormatter();
+    TableMessage message = (TableMessage) formatter.format(topic, buf).get(0);
+
+    assertEquals("test1", message.getTable());
+    assertEquals(Long.valueOf(1L), message.getTimestamp());
+    assertEquals("tag1", message.getTagKeys().get(0));
+    assertEquals("attr1", message.getAttributeKeys().get(0));
+    assertEquals(
+        "value1",
+        ((Binary[]) 
message.getValues().get(0))[0].getStringValue(StandardCharsets.UTF_8));
+    assertEquals(1L, ((long[]) message.getValues().get(1))[0], 0);
+    assertEquals(2L, ((long[]) message.getValues().get(2))[0], 0);
+    assertEquals(3L, ((int[]) message.getValues().get(3))[0], 0);
+    assertTrue(((boolean[]) message.getValues().get(4))[0]);
+    assertFalse(((boolean[]) message.getValues().get(5))[0]);
+    assertEquals(4d, ((double[]) message.getValues().get(6))[0], 0);
+    assertEquals(5f, ((float[]) message.getValues().get(7))[0], 0);
+  }
+
+  @Test
+  public void formatBatchLine() {
+    String payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=1u 1 \n"
+            + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
+
+    LinePayloadFormatter formatter = new LinePayloadFormatter();
+    TableMessage message = (TableMessage) formatter.format(topic, buf).get(1);
+
+    assertEquals("test2", message.getTable());
+    assertEquals(Long.valueOf(2L), message.getTimestamp());
+    assertEquals("tag3", message.getTagKeys().get(0));
+    assertEquals("attr3", message.getAttributeKeys().get(0));
+    assertEquals(10, ((int[]) message.getValues().get(2))[0], 0);
+  }
+
+  @Test
+  public void formatLineAnnotation() {
+    String payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=1u 1 \n"
+            + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
+
+    LinePayloadFormatter formatter = new LinePayloadFormatter();
+    List<Message> message = formatter.format(topic, buf);
+
+    assertEquals(1, message.size());
+  }
+}
diff --git 
a/mqtt-service/src/test/java/org/apache/iotdb/PayloadFormatManagerTest.java 
b/mqtt-service/src/test/java/org/apache/iotdb/PayloadFormatManagerTest.java
new file mode 100644
index 0000000..17d2ed3
--- /dev/null
+++ b/mqtt-service/src/test/java/org/apache/iotdb/PayloadFormatManagerTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.iotdb.db.protocol.mqtt.PayloadFormatManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Test;
+
+public class PayloadFormatManagerTest {
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanAllDir();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getPayloadFormat() {
+    PayloadFormatManager.getPayloadFormat("txt");
+  }
+
+  @Test
+  public void getDefaultPayloadFormat() {
+    assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
+  }
+}
diff --git a/pom.xml b/pom.xml
index ca88f39..f5cb3dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
         <module>distributions</module>
         <module>iotdb-collector</module>
         <module>mybatis-generator</module>
+        <module>mqtt-service</module>
     </modules>
     <properties>
         <!-- Explicitly set a variable used by all dependencies to the IoTDB 
dependencies, as the
@@ -183,7 +184,7 @@
         <thrift.version>0.14.1</thrift.version>
         <!-- This was the last version to support Java 8 -->
         <tomcat.version>9.0.86</tomcat.version>
-        <tsfile.version>2.1.1</tsfile.version>
+        <tsfile.version>2.2.0-251027-SNAPSHOT</tsfile.version>
         <xz.version>1.9</xz.version>
         <zeppelin.version>0.11.1</zeppelin.version>
         <zstd-jni.version>1.5.5-5</zstd-jni.version>
@@ -979,6 +980,11 @@
                 <artifactId>iotdb-session</artifactId>
                 <version>${iotdb.version}</version>
             </dependency>
+            <dependency>
+              <groupId>org.apache.iotdb</groupId>
+              <artifactId>iotdb-server</artifactId>
+              <version>${iotdb.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka-clients</artifactId>

Reply via email to