This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch remove_mqtt_server
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1ce6236e1bf22a0a7b81d0a70173845ffba7e4d7
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Oct 28 15:37:23 2025 +0800

    tmp save
---
 Code Summary.md                                    |   1 -
 LICENSE-binary                                     |   1 -
 dependencies.json                                  |   2 -
 example/mqtt-customize/README.md                   |  42 ---
 example/mqtt-customize/pom.xml                     |  39 ---
 .../server/CustomizedJsonPayloadFormatter.java     |  74 -----
 ....apache.iotdb.db.protocol.mqtt.PayloadFormatter |  20 --
 example/mqtt/README.md                             |  33 --
 example/mqtt/pom.xml                               |  37 ---
 .../java/org/apache/iotdb/mqtt/MQTTClient.java     | 112 -------
 example/pom.xml                                    |   2 -
 integration-test/import-control.xml                |   2 -
 integration-test/pom.xml                           |   4 -
 .../iotdb/it/env/cluster/ClusterConstant.java      |   3 -
 .../it/env/cluster/config/MppCommonConfig.java     |  12 -
 .../it/env/cluster/config/MppDataNodeConfig.java   |  12 -
 .../env/cluster/config/MppSharedCommonConfig.java  |  14 -
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |   7 -
 .../it/env/cluster/node/AbstractNodeWrapper.java   |   4 -
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |  15 +-
 .../it/env/remote/config/RemoteCommonConfig.java   |  10 -
 .../it/env/remote/config/RemoteDataNodeConfig.java |  10 -
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   5 -
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   2 -
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   4 -
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   4 -
 .../relational/it/mqtt/IoTDBMQTTServiceIT.java     | 161 ----------
 .../src/test/resources/logback-test.xml            |   1 -
 iotdb-core/datanode/pom.xml                        |   4 -
 .../assembly/resources/conf/logback-datanode.xml   |   1 -
 .../apache/iotdb/db/conf/DataNodeStartupCheck.java |   1 -
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 101 -------
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  50 ----
 .../db/protocol/mqtt/BrokerAuthenticator.java      |  42 ---
 .../db/protocol/mqtt/JSONPayloadFormatter.java     | 148 ---------
 .../db/protocol/mqtt/LinePayloadFormatter.java     | 280 -----------------
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  | 331 ---------------------
 .../org/apache/iotdb/db/protocol/mqtt/Message.java |  33 --
 .../db/protocol/mqtt/PayloadFormatManager.java     | 134 ---------
 .../iotdb/db/protocol/mqtt/PayloadFormatter.java   |  65 ----
 .../iotdb/db/protocol/mqtt/TableMessage.java       | 144 ---------
 .../apache/iotdb/db/protocol/mqtt/TreeMessage.java |  77 -----
 .../iotdb/db/protocol/session/IClientSession.java  |   3 +-
 .../db/protocol/session/MqttClientSession.java     |  79 -----
 .../iotdb/db/protocol/session/SessionManager.java  |  15 -
 .../java/org/apache/iotdb/db/service/DataNode.java |   3 -
 .../org/apache/iotdb/db/service/MQTTService.java   | 125 --------
 ....apache.iotdb.db.protocol.mqtt.PayloadFormatter |   3 -
 .../db/protocol/mqtt/BrokerAuthenticatorTest.java  |  51 ----
 .../db/protocol/mqtt/JSONPayloadFormatterTest.java | 133 ---------
 .../db/protocol/mqtt/LinePayloadFormatterTest.java |  94 ------
 .../db/protocol/mqtt/PayloadFormatManagerTest.java |  42 ---
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   5 -
 .../src/test/resources/datanode1conf/logback.xml   |   1 -
 .../src/test/resources/datanode2conf/logback.xml   |   1 -
 .../src/test/resources/datanode3conf/logback.xml   |   1 -
 .../datanode/src/test/resources/logback-test.xml   |   2 -
 .../conf/iotdb-system.properties.template          |  36 ---
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  10 -
 .../apache/iotdb/commons/service/ServiceType.java  |   1 -
 .../thrift-datanode/src/main/thrift/client.thrift  |   2 +-
 pom.xml                                            |   6 -
 62 files changed, 3 insertions(+), 2654 deletions(-)

diff --git a/Code Summary.md b/Code Summary.md
index 65fd1ddbc06..56b9b7a5199 100644
--- a/Code Summary.md   
+++ b/Code Summary.md   
@@ -49,7 +49,6 @@ Next, it performs following activities and registers the 
essential services:
   * **StorageEngine**
   * **RPCService**
   * **MetricsService**
-  * **MQTTService**
   * **SyncServerManager**
   * **UpgradeSevice**
   * **MergeManager**
diff --git a/LICENSE-binary b/LICENSE-binary
index aaa7a93867f..9f3a1db3f9f 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -247,7 +247,6 @@ com.github.moquette-io.moquette:moquette-broker:0.18
 io.netty:netty-buffer:4.1.126.Final
 io.netty:netty-codec:4.1.126.Final
 io.netty:netty-codec-http:4.1.126.Final
-io.netty:netty-codec-mqtt:4.1.126.Final
 io.netty:netty-common:4.1.126.Final
 io.netty:netty-handler:4.1.126.Final
 io.netty:netty-resolver:4.1.126.Final
diff --git a/dependencies.json b/dependencies.json
index 40421228542..f6513e8b4ed 100644
--- a/dependencies.json
+++ b/dependencies.json
@@ -56,7 +56,6 @@
     "io.netty:netty-codec-dns",
     "io.netty:netty-codec-http",
     "io.netty:netty-codec-http2",
-    "io.netty:netty-codec-mqtt",
     "io.netty:netty-codec-socks",
     "io.netty:netty-common",
     "io.netty:netty-handler",
@@ -134,7 +133,6 @@
     "org.fusesource.hawtbuf:hawtbuf",
     "org.fusesource.hawtdispatch:hawtdispatch",
     "org.fusesource.hawtdispatch:hawtdispatch-transport",
-    "org.fusesource.mqtt-client:mqtt-client",
     "org.glassfish.hk2:hk2-api",
     "org.glassfish.hk2:hk2-locator",
     "org.glassfish.hk2:hk2-utils",
diff --git a/example/mqtt-customize/README.md b/example/mqtt-customize/README.md
deleted file mode 100644
index 9fa224c4dab..00000000000
--- a/example/mqtt-customize/README.md
+++ /dev/null
@@ -1,42 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-# Customized IoTDB-MQTT-Broker Example
-
-## Function
-```
-The example is to show how to customize your MQTT message format
-```
-
-## Usage
-
-* Define your implementation which implements `PayloadFormatter.java`
-* modify the file in 
`src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`:
-  clean the file and put your implementation class name into the file
-* compile your implementation as a jar file
-
-  
-Then, in your server: 
-* Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. 
-* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in 
iotdb-datanode.properties)
-* Set the value of `mqtt_payload_formatter` in 
`conf/iotdb-datanode.properties` as the value of getName() in your 
implementation 
-* Launch the IoTDB server.
-* Now IoTDB will use your implementation to parse the MQTT message.
-
diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
deleted file mode 100644
index 9a530f49fd9..00000000000
--- a/example/mqtt-customize/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.iotdb</groupId>
-        <artifactId>iotdb-examples</artifactId>
-        <version>2.0.6-SNAPSHOT</version>
-    </parent>
-    <artifactId>customize-mqtt-example</artifactId>
-    <name>IoTDB: Example: Customized MQTT</name>
-    <dependencies>
-        <!-- used by the server-->
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-</project>
diff --git 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
deleted file mode 100644
index 8c3a9621739..00000000000
--- 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.mqtt.server;
-
-import org.apache.iotdb.db.protocol.mqtt.Message;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tsfile.external.commons.lang3.NotImplementedException;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
-
-  @Override
-  public List<Message> format(String topic, ByteBuf payload) {
-    // Suppose the payload is a json format
-    if (payload == null) {
-      return Collections.emptyList();
-    }
-
-    // parse data from the json and generate Messages and put them into 
List<Message> ret
-    List<Message> ret = new ArrayList<>();
-    // this is just an example, so we just generate some Messages directly
-    for (int i = 0; i < 2; i++) {
-      long ts = i;
-      TreeMessage message = new TreeMessage();
-      message.setDevice("d" + i);
-      message.setTimestamp(ts);
-      message.setMeasurements(Arrays.asList("s1", "s2"));
-      message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
-      ret.add(message);
-    }
-    return ret;
-  }
-
-  @Override
-  @Deprecated
-  public List<Message> format(ByteBuf payload) {
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public String getName() {
-    // set the value of mqtt_payload_formatter in iotdb-common.properties as 
the following string:
-    return "CustomizedJson";
-  }
-
-  @Override
-  public String getType() {
-    return PayloadFormatter.TREE_TYPE;
-  }
-}
diff --git 
a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
 
b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
deleted file mode 100644
index b0b824ba784..00000000000
--- 
a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
\ No newline at end of file
diff --git a/example/mqtt/README.md b/example/mqtt/README.md
deleted file mode 100644
index 421dcd8b726..00000000000
--- a/example/mqtt/README.md
+++ /dev/null
@@ -1,33 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-# IoTDB-MQTT-Broker Example
-
-## Function
-```
-The example is to show how to send data to IoTDB from a mqtt client.
-```
-
-## Usage
-
-* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in 
iotdb-datanode.properties)
-* Launch the IoTDB server.
-* Setup database `CREATE DATABASE root.sg` and create time timeseries `CREATE 
TIMESERIES root.sg.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN`.
-* Run `org.apache.iotdb.mqtt.MQTTClient` to run the mqtt client and send 
events to server.
diff --git a/example/mqtt/pom.xml b/example/mqtt/pom.xml
deleted file mode 100644
index b3e7e0c238d..00000000000
--- a/example/mqtt/pom.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.iotdb</groupId>
-        <artifactId>iotdb-examples</artifactId>
-        <version>2.0.6-SNAPSHOT</version>
-    </parent>
-    <artifactId>mqtt-example</artifactId>
-    <name>IoTDB: Example: MQTT</name>
-    <dependencies>
-        <dependency>
-            <groupId>org.fusesource.mqtt-client</groupId>
-            <artifactId>mqtt-client</artifactId>
-        </dependency>
-    </dependencies>
-</project>
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java 
b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
deleted file mode 100644
index ec15ad23567..00000000000
--- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.mqtt;
-
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-
-import java.util.Random;
-
-public class MQTTClient {
-
-  private static final String DATABASE = "myMqttTest";
-
-  public static void main(String[] args) throws Exception {
-    MQTT mqtt = new MQTT();
-    mqtt.setHost("127.0.0.1", 1883);
-    mqtt.setUserName("root");
-    mqtt.setPassword("root");
-    mqtt.setConnectAttemptsMax(3);
-    mqtt.setReconnectDelay(10);
-
-    BlockingConnection connection = mqtt.blockingConnection();
-    connection.connect();
-    // the config mqttPayloadFormatter must be tree-json
-    // jsonPayloadFormatter(connection);
-    // the config mqttPayloadFormatter must be table-line
-    linePayloadFormatter(connection);
-    connection.disconnect();
-  }
-
-  private static void jsonPayloadFormatter(BlockingConnection connection) 
throws Exception {
-    Random random = new Random();
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < 10; i++) {
-      String payload =
-          String.format(
-              "{\n"
-                  + "\"device\":\"root.sg.d1\",\n"
-                  + "\"timestamp\":%d,\n"
-                  + "\"measurements\":[\"s1\"],\n"
-                  + "\"values\":[%f]\n"
-                  + "}",
-              System.currentTimeMillis(), random.nextDouble());
-      sb.append(payload).append(",");
-
-      // publish a json object
-      Thread.sleep(1);
-      connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    }
-    // publish a json array
-    sb.insert(0, "[");
-    sb.replace(sb.lastIndexOf(","), sb.length(), "]");
-    connection.publish("root.sg.d1.s1", sb.toString().getBytes(), 
QoS.AT_LEAST_ONCE, false);
-  }
-
-  // The database must be created in advance
-  private static void linePayloadFormatter(BlockingConnection connection) 
throws Exception {
-    // myTable,tag1=t1,tag2=t2 fieldKey1="1,2,3" 1740109006001
-    String payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 
1740109006001";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-
-    payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006002";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-
-    payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006003";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-    payload =
-        "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 
field1=\"fieldValue1\",field2=1i,field3=1u 1";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-
-    payload = "test1,tag1=t1,tag2=t2  field4=2,field5=2i32,field6=2f 2";
-    connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-
-    payload =
-        "test1,tag1=t1,tag2=t2  field7=t,field8=T,field9=true 3 \n "
-            + "test1,tag1=t1,tag2=t2  field7=f,field8=F,field9=FALSE 4";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-
-    payload =
-        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
-            + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-
-    payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 
field4=2,field5=2i32,field6=2f 6";
-    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-    Thread.sleep(10);
-  }
-}
diff --git a/example/pom.xml b/example/pom.xml
index cce52b920c4..80ba0e423ca 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -31,8 +31,6 @@
     <name>IoTDB: Examples</name>
     <modules>
         <module>jdbc</module>
-        <module>mqtt</module>
-        <module>mqtt-customize</module>
         <module>pipe-count-point-processor</module>
         <module>pipe-opc-ua-sink</module>
         <module>rest-java-example</module>
diff --git a/integration-test/import-control.xml 
b/integration-test/import-control.xml
index 08cc61008c6..2f427c3954c 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -39,7 +39,6 @@
         <allow class="org.apache.iotdb.db.utils.constant.TestConstant"/>
         <allow class="org.apache.iotdb.db.utils.MathUtils"/>
         <allow 
class="org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode"/>
-        <allow class="org.fusesource.mqtt.client.QoS"/>
         <allow class="org.apache.iotdb.commons.path.PartialPath"/>
         <allow pkg="java.text"/>
         <allow pkg="org.apache.iotdb.db.it.utils"/>
@@ -53,7 +52,6 @@
         <allow pkg="org\.apache\.iotdb\.tsfile\.read.*" regex="true"/>
         <allow pkg="org\.apache\.iotdb\.tsfile\.utils.*" regex="true"/>
         <allow pkg="org\.apache\.iotdb\.tsfile\.write.*" regex="true"/>
-        <allow pkg="org\.apache\.iotdb\.db\.engine\.trigger\.sink\.mqtt.*" 
regex="true"/>
     </subpackage>
     <subpackage name="confignode.it">
         <allow class="java.nio.ByteBuffer"/>
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 203f51415dc..38690def05b 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -213,10 +213,6 @@
             <artifactId>jcip-annotations</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.fusesource.mqtt-client</groupId>
-            <artifactId>mqtt-client</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 82ed7976959..3c5be59d16a 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -162,9 +162,6 @@ public class ClusterConstant {
   public static final String SCHEMA_REPLICATION_FACTOR = 
"schema_replication_factor";
   public static final String DATA_REPLICATION_FACTOR = 
"data_replication_factor";
 
-  public static final String MQTT_HOST = "mqtt_host";
-  public static final String MQTT_PORT = "mqtt_port";
-  public static final String MQTT_DATA_PATH = "mqtt_data_path";
   public static final String UDF_LIB_DIR = "udf_lib_dir";
   public static final String TRIGGER_LIB_DIR = "trigger_lib_dir";
   public static final String PIPE_LIB_DIR = "pipe_lib_dir";
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 1df43c16206..ce99f8010e1 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -309,18 +309,6 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableMQTTService(boolean enableMQTTService) {
-    setProperty("enable_mqtt_service", String.valueOf(enableMQTTService));
-    return this;
-  }
-
-  @Override
-  public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
-    setProperty("mqtt_payload_formatter", 
String.valueOf(mqttPayloadFormatter));
-    return this;
-  }
-
   @Override
   public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
     setProperty("schema_engine_mode", schemaEngineMode);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 357c15c7856..a99d00bd732 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -96,18 +96,6 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     return this;
   }
 
-  @Override
-  public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) {
-    setProperty("enable_mqtt_service", String.valueOf(enableMQTTService));
-    return this;
-  }
-
-  @Override
-  public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
-    setProperty("mqtt_payload_formatter", 
String.valueOf(mqttPayloadFormatter));
-    return this;
-  }
-
   @Override
   public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
     setProperty("last_cache_operation_on_load", strategyName);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 504b1ae60e2..7c7ac1b370c 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -309,20 +309,6 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableMQTTService(boolean enableMQTTService) {
-    cnConfig.setEnableMQTTService(enableMQTTService);
-    dnConfig.setEnableMQTTService(enableMQTTService);
-    return this;
-  }
-
-  @Override
-  public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
-    cnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
-    dnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
-    return this;
-  }
-
   @Override
   public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
     cnConfig.setSchemaEngineMode(schemaEngineMode);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 8308225a9af..12ec3d48fb5 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -1456,13 +1456,6 @@ public abstract class AbstractEnv implements BaseEnv {
     throw new IllegalStateException(lastException);
   }
 
-  @Override
-  public int getMqttPort() {
-    return dataNodeWrapperList
-        .get(new 
Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()))
-        .getMqttPort();
-  }
-
   @Override
   public String getIP() {
     return dataNodeWrapperList.get(0).getIp();
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 1a349034469..2d866daf66b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -88,8 +88,6 @@ import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STAND
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_DATA_REGION_REPLICA_NUM;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_SCHEMA_REGION_CONSENSUS;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_SCHEMA_REGION_REPLICA_NUM;
-import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST;
-import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_BATCH_MODE;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_BATCH_MODE_CONFIG_NODE_CONSENSUS;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_BATCH_MODE_DATA_REGION_CONSENSUS;
@@ -188,8 +186,6 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
     immutableCommonProperties.setProperty(UDF_LIB_DIR, 
MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty(TRIGGER_LIB_DIR, 
MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty(PIPE_LIB_DIR, 
MppBaseConfig.NULL_VALUE);
-    immutableCommonProperties.setProperty(MQTT_HOST, MppBaseConfig.NULL_VALUE);
-    immutableCommonProperties.setProperty(MQTT_PORT, MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty(REST_SERVICE_PORT, 
MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty(INFLUXDB_RPC_PORT, 
MppBaseConfig.NULL_VALUE);
     this.jvmConfig = initVMConfig();
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index f08c085bc6c..24a5d01ae58 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -58,9 +58,6 @@ import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.DN_WAL_DIRS;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.IOTDB_SYSTEM_PROPERTIES_FILE;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.MAIN_CLASS_NAME;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.MAX_TSBLOCK_SIZE_IN_BYTES;
-import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_DATA_PATH;
-import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST;
-import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.PAGE_SIZE_IN_BYTE;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_AIR_GAP_RECEIVER_PORT;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.REST_SERVICE_PORT;
@@ -77,7 +74,6 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   private final String internalAddress;
   private final int dataRegionConsensusPort;
   private final int schemaRegionConsensusPort;
-  private final int mqttPort;
   private final int restServicePort;
   private final int pipeAirGapReceiverPort;
 
@@ -101,7 +97,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     this.internalPort = portList[2];
     this.dataRegionConsensusPort = portList[3];
     this.schemaRegionConsensusPort = portList[4];
-    this.mqttPort = portList[5];
+    // deprecated: this.mqttPort = portList[5];
     this.pipeAirGapReceiverPort = portList[6];
     this.restServicePort = portList[10] + 6000;
     this.defaultNodePropertiesFile =
@@ -113,11 +109,6 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     reloadMutableFields();
 
     // Initialize immutable properties
-    // Override mqtt properties of super class
-    immutableCommonProperties.setProperty(MQTT_HOST, super.getIp());
-    immutableCommonProperties.setProperty(MQTT_PORT, 
String.valueOf(this.mqttPort));
-    immutableCommonProperties.setProperty(
-        MQTT_DATA_PATH, getNodePath() + File.separator + "mqttData");
     immutableCommonProperties.setProperty(
         PIPE_AIR_GAP_RECEIVER_PORT, 
String.valueOf(this.pipeAirGapReceiverPort));
 
@@ -292,10 +283,6 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     return schemaRegionConsensusPort;
   }
 
-  public int getMqttPort() {
-    return mqttPort;
-  }
-
   public int getPipeAirGapReceiverPort() {
     return pipeAirGapReceiverPort;
   }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index fdcff20dbc8..c33c4662557 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -216,16 +216,6 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
-  @Override
-  public CommonConfig setEnableMQTTService(boolean enableMQTTService) {
-    return this;
-  }
-
-  @Override
-  public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
-    return this;
-  }
-
   @Override
   public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 1af7cb8f613..af7f38cef7a 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -59,16 +59,6 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
     return this;
   }
 
-  @Override
-  public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) {
-    return this;
-  }
-
-  @Override
-  public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
-    return this;
-  }
-
   @Override
   public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 586eff60494..6e47dbc4c09 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -475,11 +475,6 @@ public class RemoteServerEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
-  @Override
-  public int getMqttPort() {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public String getIP() {
     return ip_addr;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 8b32deb3ea8..5744e50a523 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -330,8 +330,6 @@ public interface BaseEnv {
   /** Shutdown forcibly all existed DataNodes. */
   void shutdownForciblyAllDataNodes();
 
-  int getMqttPort();
-
   String getIP();
 
   String getPort();
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 9767e1c089e..3fd5db3988a 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -100,10 +100,6 @@ public interface CommonConfig {
 
   CommonConfig setMaxDegreeOfIndexNode(int maxDegreeOfIndexNode);
 
-  CommonConfig setEnableMQTTService(boolean enableMQTTService);
-
-  CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
-
   CommonConfig setSchemaEngineMode(String schemaEngineMode);
 
   CommonConfig setSelectIntoInsertTabletPlanRowLimit(int 
selectIntoInsertTabletPlanRowLimit);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 0ae46ffc70f..6259f4e4bdc 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -38,10 +38,6 @@ public interface DataNodeConfig {
 
   DataNodeConfig setCompactionScheduleInterval(long 
compactionScheduleInterval);
 
-  DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
-
-  DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
-
   DataNodeConfig setLoadLastCacheStrategy(String strategyName);
 
   DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
deleted file mode 100644
index afc202ae68d..00000000000
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.relational.it.mqtt;
-
-import org.apache.iotdb.isession.ITableSession;
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.it.env.EnvFactory;
-import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
-import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.TableClusterIT;
-import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
-import org.apache.iotdb.itbase.env.BaseEnv;
-import org.apache.iotdb.rpc.StatementExecutionException;
-
-import org.apache.tsfile.read.common.Field;
-import org.awaitility.Awaitility;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(IoTDBTestRunner.class)
-@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
-public class IoTDBMQTTServiceIT {
-  private BlockingConnection connection;
-  private static final String IP = System.getProperty("RemoteIp", "127.0.0.1");
-  private static final String USER = System.getProperty("RemoteUser", "root");
-  private static final String PASSWORD = System.getProperty("RemotePassword", 
"root");
-  private static final String DATABASE = "mqtttest";
-  public static final String FORMATTER = "line";
-
-  @Before
-  public void setUp() throws Exception {
-    BaseEnv baseEnv = EnvFactory.getEnv();
-    baseEnv.getConfig().getDataNodeConfig().setEnableMQTTService(true);
-    baseEnv.getConfig().getDataNodeConfig().setMqttPayloadFormatter(FORMATTER);
-    baseEnv.initClusterEnvironment();
-    DataNodeWrapper portConflictDataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
-    int port = portConflictDataNodeWrapper.getMqttPort();
-    MQTT mqtt = new MQTT();
-    mqtt.setHost(IP, port);
-    mqtt.setUserName(USER);
-    mqtt.setPassword(PASSWORD);
-    mqtt.setConnectAttemptsMax(3);
-    mqtt.setReconnectDelay(10);
-
-    connection = mqtt.blockingConnection();
-    connection.connect();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    try {
-      if (connection != null) {
-        connection.disconnect();
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-    EnvFactory.getEnv().cleanClusterEnvironment();
-  }
-
-  @Test
-  public void testNoAttr() throws Exception {
-    try (final ITableSession session =
-        EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
-      session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
-      String payload1 = "test1,tag1=t1,tag2=t2 field1=1,field2=1f,field3=1i32 
1";
-      Awaitility.await()
-          .atMost(3, TimeUnit.MINUTES)
-          .pollInterval(1, TimeUnit.SECONDS)
-          .until(
-              () -> {
-                connection.publish(
-                    DATABASE + "/myTopic", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-                try (final SessionDataSet dataSet =
-                    session.executeQueryStatement(
-                        "select tag1,tag2,field1,field2,field3 from test1 
where time = 1")) {
-                  assertEquals(5, dataSet.getColumnNames().size());
-                  List<Field> fields = dataSet.next().getFields();
-                  assertEquals("t1", fields.get(0).getStringValue());
-                  assertEquals("t2", fields.get(1).getStringValue());
-                  assertEquals(1d, fields.get(2).getDoubleV(), 0);
-                  assertEquals(1f, fields.get(3).getFloatV(), 0);
-                  assertEquals(1, fields.get(4).getIntV(), 0);
-                  return true;
-                } catch (StatementExecutionException e) {
-                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
-                    return false;
-                  } else {
-                    throw e;
-                  }
-                }
-              });
-    }
-  }
-
-  @Test
-  public void testWithAttr() throws Exception {
-    try (final ITableSession session =
-        EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
-      session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
-      String payload1 = "test2,tag1=t1,tag2=t2 attr3=a3,attr4=a4 
field1=1,field2=1f,field3=1i32 1";
-      Awaitility.await()
-          .atMost(3, TimeUnit.MINUTES)
-          .pollInterval(1, TimeUnit.SECONDS)
-          .until(
-              () -> {
-                connection.publish(
-                    DATABASE + "/myTopic", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-                try (final SessionDataSet dataSet =
-                    session.executeQueryStatement(
-                        "select tag1,tag2,attr3,attr4,field1,field2,field3 
from test2 where time = 1")) {
-                  assertEquals(7, dataSet.getColumnNames().size());
-                  List<Field> fields = dataSet.next().getFields();
-                  assertEquals("t1", fields.get(0).getStringValue());
-                  assertEquals("t2", fields.get(1).getStringValue());
-                  assertEquals("a3", fields.get(2).getStringValue());
-                  assertEquals("a4", fields.get(3).getStringValue());
-                  assertEquals(1d, fields.get(4).getDoubleV(), 0);
-                  assertEquals(1f, fields.get(5).getFloatV(), 0);
-                  assertEquals(1, fields.get(6).getIntV(), 0);
-                  return true;
-                } catch (StatementExecutionException e) {
-                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
-                    return false;
-                  } else {
-                    throw e;
-                  }
-                }
-              });
-    }
-  }
-}
diff --git a/integration-test/src/test/resources/logback-test.xml 
b/integration-test/src/test/resources/logback-test.xml
index ba595ff9752..9b4f3af3e4b 100644
--- a/integration-test/src/test/resources/logback-test.xml
+++ b/integration-test/src/test/resources/logback-test.xml
@@ -50,7 +50,6 @@
     <logger name="org.apache.iotdb.commons.service.RegisterManager" 
level="INFO"/>
     <logger name="org.apache.iotdb.db.service.DataNode" level="WARN"/>
     <logger name="org.apache.iotdb.db.service.ExternalRPCService" 
level="INFO"/>
-    <logger name="org.apache.iotdb.db.service.MQTTService" level="INFO"/>
     <logger name="org.apache.iotdb.db.conf.IoTDBDescriptor" level="WARN"/>
     <logger name="org.apache.tsfile.common.conf.TSFileDescriptor" 
level="WARN"/>
     <logger name="DETAILED_FAILURE_QUERY_TRACE" level="ERROR"/>
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 626d8265eed..c8ad7d48177 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -195,10 +195,6 @@
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-http</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-codec-mqtt</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-pool2</artifactId>
diff --git 
a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml 
b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
index 557fd8b160f..299b5e114d9 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
+++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
@@ -245,7 +245,6 @@
         <appender-ref ref="FILEALL"/>
         <appender-ref ref="stdout"/>
     </root>
-    <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/>
     <logger level="info" name="org.apache.iotdb.db.service"/>
     <logger level="info" name="org.apache.iotdb.db.conf"/>
     <logger level="info" name="org.apache.iotdb.db.cost.statistic">
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java
index 7291469f676..1368e4d3b39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java
@@ -45,7 +45,6 @@ public class DataNodeStartupCheck extends StartupChecks {
   private void checkDataNodePortUnique() throws StartupException {
     Set<Integer> portSet = new HashSet<>();
     portSet.add(config.getInternalPort());
-    portSet.add(config.getMqttPort());
     portSet.add(config.getRpcPort());
     portSet.add(config.getMppDataExchangePort());
     portSet.add(config.getDataRegionConsensusPort());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7db1b0f0225..25d4f0d2a12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -103,27 +103,6 @@ public class IoTDBConfig {
 
   public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
 
-  /** Whether to enable the mqtt service. */
-  private boolean enableMQTTService = false;
-
-  /** The mqtt service binding host. */
-  private String mqttHost = "127.0.0.1";
-
-  /** The mqtt service binding port. */
-  private int mqttPort = 1883;
-
-  /** The handler pool size for handing the mqtt messages. */
-  private int mqttHandlerPoolSize = Math.max(1, 
Runtime.getRuntime().availableProcessors() >> 1);
-
-  /** The mqtt message payload formatter. */
-  private String mqttPayloadFormatter = "json";
-
-  /** The mqtt save data path */
-  private String mqttDataPath = "data/";
-
-  /** Max mqtt message size. Unit: byte */
-  private int mqttMaxMessageSize = 1048576;
-
   /** Rpc binding address. */
   private String rpcAddress = "0.0.0.0";
 
@@ -283,10 +262,6 @@ public class IoTDBConfig {
 
   private int pipeTaskThreadCount = 5;
 
-  /** External lib directory for MQTT, stores user-uploaded JAR files */
-  private String mqttDir =
-      IoTDBConstant.EXT_FOLDER_NAME + File.separator + 
IoTDBConstant.MQTT_FOLDER_NAME;
-
   /** Tiered data directories. It can be settled as dataDirs = {{"data1"}, 
{"data2", "data3"}}; */
   private String[][] tierDataDirs = {
     {IoTDBConstant.DN_DEFAULT_DATA_DIR + File.separator + 
IoTDBConstant.DATA_FOLDER_NAME}
@@ -965,9 +940,6 @@ public class IoTDBConfig {
   /** Trigger HTTP forward pool max connection for per route */
   private int triggerForwardHTTPPOOLMaxPerRoute = 20;
 
-  /** Trigger MQTT forward pool size */
-  private int triggerForwardMQTTPoolSize = 4;
-
   /** How many times will we retry to find an instance of stateful trigger */
   private int retryNumToFindStatefulTrigger = 3;
 
@@ -1343,7 +1315,6 @@ public class IoTDBConfig {
       iotConsensusV2ReceiverFileDirs[i] = 
addDataHomeDir(iotConsensusV2ReceiverFileDirs[i]);
     }
     iotConsensusV2DeletionFileDir = 
addDataHomeDir(iotConsensusV2DeletionFileDir);
-    mqttDir = addDataHomeDir(mqttDir);
     extPipeDir = addDataHomeDir(extPipeDir);
     queryDir = addDataHomeDir(queryDir);
     sortTmpDir = addDataHomeDir(sortTmpDir);
@@ -1672,14 +1643,6 @@ public class IoTDBConfig {
     this.pipeTemporaryLibDir = pipeDir + File.separator + 
IoTDBConstant.TMP_FOLDER_NAME;
   }
 
-  public String getMqttDir() {
-    return mqttDir;
-  }
-
-  public void setMqttDir(String mqttDir) {
-    this.mqttDir = mqttDir;
-  }
-
   public String getMultiDirStrategyClassName() {
     return multiDirStrategyClassName;
   }
@@ -2488,62 +2451,6 @@ public class IoTDBConfig {
     this.thriftServerAwaitTimeForStopService = 
thriftServerAwaitTimeForStopService;
   }
 
-  public boolean isEnableMQTTService() {
-    return enableMQTTService;
-  }
-
-  public void setEnableMQTTService(boolean enableMQTTService) {
-    this.enableMQTTService = enableMQTTService;
-  }
-
-  public String getMqttHost() {
-    return mqttHost;
-  }
-
-  public void setMqttHost(String mqttHost) {
-    this.mqttHost = mqttHost;
-  }
-
-  public int getMqttPort() {
-    return mqttPort;
-  }
-
-  public void setMqttPort(int mqttPort) {
-    this.mqttPort = mqttPort;
-  }
-
-  public int getMqttHandlerPoolSize() {
-    return mqttHandlerPoolSize;
-  }
-
-  public void setMqttHandlerPoolSize(int mqttHandlerPoolSize) {
-    this.mqttHandlerPoolSize = mqttHandlerPoolSize;
-  }
-
-  public String getMqttPayloadFormatter() {
-    return mqttPayloadFormatter;
-  }
-
-  public void setMqttPayloadFormatter(String mqttPayloadFormatter) {
-    this.mqttPayloadFormatter = mqttPayloadFormatter;
-  }
-
-  public String getMqttDataPath() {
-    return mqttDataPath;
-  }
-
-  public void setMqttDataPath(String mqttDataPath) {
-    this.mqttDataPath = mqttDataPath;
-  }
-
-  public int getMqttMaxMessageSize() {
-    return mqttMaxMessageSize;
-  }
-
-  public void setMqttMaxMessageSize(int mqttMaxMessageSize) {
-    this.mqttMaxMessageSize = mqttMaxMessageSize;
-  }
-
   public int getTagAttributeFlushInterval() {
     return tagAttributeFlushInterval;
   }
@@ -3225,14 +3132,6 @@ public class IoTDBConfig {
     this.triggerForwardHTTPPOOLMaxPerRoute = triggerForwardHTTPPOOLMaxPerRoute;
   }
 
-  public int getTriggerForwardMQTTPoolSize() {
-    return triggerForwardMQTTPoolSize;
-  }
-
-  public void setTriggerForwardMQTTPoolSize(int triggerForwardMQTTPoolSize) {
-    this.triggerForwardMQTTPoolSize = triggerForwardMQTTPoolSize;
-  }
-
   public int getRetryNumToFindStatefulTrigger() {
     return retryNumToFindStatefulTrigger;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 59b59e130c5..f67894c706f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -876,9 +876,6 @@ public class IoTDBDescriptor {
                 "max_measurement_num_of_internal_request",
                 
String.valueOf(conf.getMaxMeasurementNumOfInternalRequest()))));
 
-    // mqtt
-    loadMqttProps(properties);
-
     conf.setIntoOperationBufferSizeInByte(
         Long.parseLong(
             properties.getProperty(
@@ -1873,48 +1870,6 @@ public class IoTDBDescriptor {
     }
   }
 
-  // Mqtt related
-  private void loadMqttProps(TrimProperties properties) {
-    conf.setMqttDir(properties.getProperty("mqtt_root_dir", 
conf.getMqttDir()));
-
-    if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
-      
conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME).trim());
-    } else {
-      LOGGER.info("MQTT host is not configured, will use dn_rpc_address.");
-      conf.setMqttHost(properties.getProperty(IoTDBConstant.DN_RPC_ADDRESS, 
conf.getRpcAddress()));
-    }
-
-    if (properties.getProperty(IoTDBConstant.MQTT_PORT_NAME) != null) {
-      conf.setMqttPort(
-          
Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_PORT_NAME).trim()));
-    }
-
-    if (properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME) != 
null) {
-      conf.setMqttHandlerPoolSize(
-          Integer.parseInt(
-              
properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME).trim()));
-    }
-
-    if (properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME) != 
null) {
-      conf.setMqttPayloadFormatter(
-          
properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME).trim());
-    }
-
-    if (properties.getProperty(IoTDBConstant.MQTT_DATA_PATH) != null) {
-      
conf.setMqttDataPath(properties.getProperty(IoTDBConstant.MQTT_DATA_PATH).trim());
-    }
-
-    if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) {
-      conf.setEnableMQTTService(
-          
Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT).trim()));
-    }
-
-    if (properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE) != null) {
-      conf.setMqttMaxMessageSize(
-          
Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE).trim()));
-    }
-  }
-
   // timed flush memtable
   private void loadTimedService(TrimProperties properties) throws IOException {
     conf.setEnableTimedFlushSeqMemtable(
@@ -2542,11 +2497,6 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "trigger_forward_http_pool_max_per_route",
                 
Integer.toString(conf.getTriggerForwardHTTPPOOLMaxPerRoute()))));
-    conf.setTriggerForwardMQTTPoolSize(
-        Integer.parseInt(
-            properties.getProperty(
-                "trigger_forward_mqtt_pool_size",
-                Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
   }
 
   private void loadPipeProps(TrimProperties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
deleted file mode 100644
index a05c8264b82..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import io.moquette.broker.security.IAuthenticator;
-import org.apache.tsfile.external.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** The MQTT broker authenticator. */
-public class BrokerAuthenticator implements IAuthenticator {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BrokerAuthenticator.class);
-
-  @Override
-  public boolean checkValid(String clientId, String username, byte[] password) 
{
-    if (StringUtils.isBlank(username) || password == null) {
-      return false;
-    }
-
-    return (AuthorityChecker.checkUser(username, new 
String(password)).getCode()
-        == TSStatusCode.SUCCESS_STATUS.getStatusCode());
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
deleted file mode 100644
index cc857b7295c..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.mqtt;
-
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.reflect.TypeToken;
-import io.netty.buffer.ByteBuf;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.external.commons.lang3.NotImplementedException;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The JSON payload formatter. two json format supported: { 
"device":"root.sg.d1",
- * "timestamp":1586076045524, "measurements":["s1","s2"], 
"values":[0.530635,0.530635] }
- *
- * <p>{ "device":"root.sg.d1", "timestamps":[1586076045524,1586076065526],
- * "measurements":["s1","s2"], "values":[[0.530635,0.530635], 
[0.530655,0.530695]] }
- */
-public class JSONPayloadFormatter implements PayloadFormatter {
-  private static final String JSON_KEY_DEVICE = "device";
-  private static final String JSON_KEY_TIMESTAMP = "timestamp";
-  private static final String JSON_KEY_TIMESTAMPS = "timestamps";
-  private static final String JSON_KEY_MEASUREMENTS = "measurements";
-  private static final String JSON_KEY_VALUES = "values";
-  private static final String JSON_KEY_DATATYPE = "datatypes";
-  private static final Gson GSON = new GsonBuilder().create();
-
-  @Override
-  public List<Message> format(String topic, ByteBuf payload) {
-    if (payload == null) {
-      return new ArrayList<>();
-    }
-    String txt = payload.toString(StandardCharsets.UTF_8);
-    JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
-    if (jsonElement.isJsonObject()) {
-      JsonObject jsonObject = jsonElement.getAsJsonObject();
-      if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) {
-        return formatJson(jsonObject);
-      }
-      if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) {
-        return formatBatchJson(jsonObject);
-      }
-    } else if (jsonElement.isJsonArray()) {
-      JsonArray jsonArray = jsonElement.getAsJsonArray();
-      List<Message> messages = new ArrayList<>();
-      for (JsonElement element : jsonArray) {
-        JsonObject jsonObject = element.getAsJsonObject();
-        if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) {
-          messages.addAll(formatJson(jsonObject));
-        }
-        if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) {
-          messages.addAll(formatBatchJson(jsonObject));
-        }
-      }
-      return messages;
-    }
-    throw new JsonParseException("payload is invalidate");
-  }
-
-  @Override
-  @Deprecated
-  public List<Message> format(ByteBuf payload) {
-    throw new NotImplementedException();
-  }
-
-  private List<Message> formatJson(JsonObject jsonObject) {
-    TreeMessage message = new TreeMessage();
-    message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());
-    message.setTimestamp(jsonObject.get(JSON_KEY_TIMESTAMP).getAsLong());
-    message.setMeasurements(
-        GSON.fromJson(
-            jsonObject.get(JSON_KEY_MEASUREMENTS), new 
TypeToken<List<String>>() {}.getType()));
-    message.setValues(
-        GSON.fromJson(jsonObject.get(JSON_KEY_VALUES), new 
TypeToken<List<String>>() {}.getType()));
-    if (jsonObject.has(JSON_KEY_DATATYPE)) {
-      message.setDataTypes(
-          GSON.fromJson(
-              jsonObject.get(JSON_KEY_DATATYPE), new 
TypeToken<List<TSDataType>>() {}.getType()));
-    }
-    return Lists.newArrayList(message);
-  }
-
-  private List<Message> formatBatchJson(JsonObject jsonObject) {
-    String device = jsonObject.get(JSON_KEY_DEVICE).getAsString();
-    List<String> measurements =
-        GSON.fromJson(
-            jsonObject.getAsJsonArray(JSON_KEY_MEASUREMENTS),
-            new TypeToken<List<String>>() {}.getType());
-    List<Long> timestamps =
-        GSON.fromJson(
-            jsonObject.get(JSON_KEY_TIMESTAMPS), new TypeToken<List<Long>>() 
{}.getType());
-    List<List<String>> values =
-        GSON.fromJson(
-            jsonObject.get(JSON_KEY_VALUES), new 
TypeToken<List<List<String>>>() {}.getType());
-    List<TSDataType> types =
-        jsonObject.has(JSON_KEY_DATATYPE)
-            ? GSON.fromJson(
-                jsonObject.get(JSON_KEY_DATATYPE), new 
TypeToken<List<TSDataType>>() {}.getType())
-            : null;
-
-    List<Message> ret = new ArrayList<>(timestamps.size());
-    for (int i = 0; i < timestamps.size(); i++) {
-      TreeMessage message = new TreeMessage();
-      message.setDevice(device);
-      message.setTimestamp(timestamps.get(i));
-      message.setMeasurements(measurements);
-      message.setValues(values.get(i));
-      message.setDataTypes(types);
-      ret.add(message);
-    }
-    return ret;
-  }
-
-  @Override
-  public String getName() {
-    return "json";
-  }
-
-  @Override
-  public String getType() {
-    return PayloadFormatter.TREE_TYPE;
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
deleted file mode 100644
index 8b596ee2c89..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.mqtt;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.external.commons.lang3.NotImplementedException;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-
-/**
- * The Line payload formatter. myTable,tag1=value1,tag2=value2 
attr1=value1,attr2=value2
- * fieldKey="fieldValue" 1740109006000 \n myTable,tag1=value1,tag2=value2 
fieldKey="fieldValue"
- * 1740109006000
- */
-public class LinePayloadFormatter implements PayloadFormatter {
-
-  private static final Logger log = 
LoggerFactory.getLogger(LinePayloadFormatter.class);
-
-  /*
-  Regular expression matching line protocol ,the attributes field is not 
required
-  */
-  private static final String REGEX =
-      "(?<table>\\w+)(,(?<tags>[^ ]+))?(\\s+(?<attributes>[^ 
]+))?\\s+(?<fields>[^ ]+)\\s+(?<timestamp>\\d+)";
-  private static final String COMMA = ",";
-  private static final String WELL = "#";
-  private static final String LINE_BREAK = "\n";
-  private static final String EQUAL = "=";
-  private static final String TABLE = "table";
-  private static final String TAGS = "tags";
-  private static final String ATTRIBUTES = "attributes";
-  private static final String FIELDS = "fields";
-  private static final String TIMESTAMP = "timestamp";
-  private static final String NULL = "null";
-
-  private final Pattern pattern;
-
-  public LinePayloadFormatter() {
-    pattern = Pattern.compile(REGEX);
-  }
-
-  @Override
-  public List<Message> format(String topic, ByteBuf payload) {
-    List<Message> messages = new ArrayList<>();
-    if (payload == null) {
-      return messages;
-    }
-
-    String txt = payload.toString(StandardCharsets.UTF_8);
-    String[] lines = txt.split(LINE_BREAK);
-    // '/' previously defined as a database name
-    String database = !topic.contains("/") ? topic : topic.substring(0, 
topic.indexOf("/"));
-    for (String line : lines) {
-      if (line.trim().startsWith(WELL)) {
-        continue;
-      }
-      TableMessage message = new TableMessage();
-      try {
-        Matcher matcher = pattern.matcher(line.trim());
-        if (!matcher.matches()) {
-          log.warn("Invalid line protocol format ,line is {}", line);
-          continue;
-        }
-
-        // Parsing Database Name
-        message.setDatabase((database));
-
-        // Parsing Table Names
-        message.setTable(matcher.group(TABLE));
-
-        // Parsing Tags
-        if (!setTags(matcher, message)) {
-          log.warn("The tags is error , line is {}", line);
-          continue;
-        }
-
-        // Parsing Attributes
-        if (!setAttributes(matcher, message)) {
-          log.warn("The attributes is error , line is {}", line);
-          continue;
-        }
-
-        // Parsing Fields
-        if (!setFields(matcher, message)) {
-          log.warn("The fields is error , line is {}", line);
-          continue;
-        }
-
-        // Parsing timestamp
-        if (!setTimestamp(matcher, message)) {
-          log.warn("The timestamp is error , line is {}", line);
-          continue;
-        }
-
-        messages.add(message);
-      } catch (Exception e) {
-        log.warn(
-            "The line pattern parsing fails, and the failed line message is {} 
,exception is",
-            line,
-            e);
-      }
-    }
-    return messages;
-  }
-
-  @Override
-  @Deprecated
-  public List<Message> format(ByteBuf payload) {
-    throw new NotImplementedException();
-  }
-
-  private boolean setTags(Matcher matcher, TableMessage message) {
-    List<String> tagKeys = new ArrayList<>();
-    List<Object> tagValues = new ArrayList<>();
-    String tagsGroup = matcher.group(TAGS);
-    if (tagsGroup != null && !tagsGroup.isEmpty()) {
-      String[] tagPairs = tagsGroup.split(COMMA);
-      for (String tagPair : tagPairs) {
-        if (!tagPair.isEmpty()) {
-          String[] keyValue = tagPair.split(EQUAL);
-          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
-            tagKeys.add(keyValue[0]);
-            tagValues.add(new Binary[] {new 
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
-          }
-        }
-      }
-    }
-    if (!tagKeys.isEmpty() && !tagValues.isEmpty() && tagKeys.size() == 
tagValues.size()) {
-      message.setTagKeys(tagKeys);
-      message.setTagValues(tagValues);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private boolean setAttributes(Matcher matcher, TableMessage message) {
-    List<String> attributeKeys = new ArrayList<>();
-    List<Object> attributeValues = new ArrayList<>();
-    String attributesGroup = matcher.group(ATTRIBUTES);
-    if (attributesGroup != null && !attributesGroup.isEmpty()) {
-      String[] attributePairs = attributesGroup.split(COMMA);
-      for (String attributePair : attributePairs) {
-        if (!attributePair.isEmpty()) {
-          String[] keyValue = attributePair.split(EQUAL);
-          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
-            attributeKeys.add(keyValue[0]);
-            attributeValues.add(
-                new Binary[] {new 
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
-          }
-        }
-      }
-    }
-    if (attributeKeys.size() == attributeValues.size()) {
-      message.setAttributeKeys(attributeKeys);
-      message.setAttributeValues(attributeValues);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private boolean setFields(Matcher matcher, TableMessage message) {
-    List<String> fields = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-    List<Object> values = new ArrayList<>();
-    String fieldsGroup = matcher.group(FIELDS);
-    if (fieldsGroup != null && !fieldsGroup.isEmpty()) {
-      String[] fieldPairs = splitFieldPairs(fieldsGroup);
-      for (String fieldPair : fieldPairs) {
-        if (!fieldPair.isEmpty()) {
-          String[] keyValue = fieldPair.split(EQUAL);
-          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
-            fields.add(keyValue[0]);
-            Pair<TSDataType, Object> typeAndValue = analyticValue(keyValue[1]);
-            values.add(typeAndValue.getRight());
-            dataTypes.add(typeAndValue.getLeft());
-          }
-        }
-      }
-    }
-    if (!fields.isEmpty() && !values.isEmpty() && fields.size() == 
values.size()) {
-      message.setFields(fields);
-      message.setDataTypes(dataTypes);
-      message.setValues(values);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private String[] splitFieldPairs(String fieldsGroup) {
-
-    if (fieldsGroup == null || fieldsGroup.isEmpty()) return new String[0];
-    Matcher m = 
Pattern.compile("\\w+=\"[^\"]*\"|\\w+=[^,]*").matcher(fieldsGroup);
-    Stream.Builder<String> builder = Stream.builder();
-
-    while (m.find()) builder.add(m.group());
-    return builder.build().toArray(String[]::new);
-  }
-
-  private Pair<TSDataType, Object> analyticValue(String value) {
-    if (value.startsWith("\"") && value.endsWith("\"")) {
-      // String
-      return new Pair<>(
-          TSDataType.TEXT,
-          new Binary[] {
-            new Binary(value.substring(1, value.length() - 
1).getBytes(StandardCharsets.UTF_8))
-          });
-    } else if (value.equalsIgnoreCase("t")
-        || value.equalsIgnoreCase("true")
-        || value.equalsIgnoreCase("f")
-        || value.equalsIgnoreCase("false")) {
-      // boolean
-      return new Pair<>(
-          TSDataType.BOOLEAN,
-          new boolean[] {value.equalsIgnoreCase("t") || 
value.equalsIgnoreCase("true")});
-    } else if (value.endsWith("f")) {
-      // float
-      return new Pair<>(
-          TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, 
value.length() - 1))});
-    } else if (value.endsWith("i32")) {
-      // int
-      return new Pair<>(
-          TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, 
value.length() - 3))});
-    } else if (value.endsWith("u") || value.endsWith("i")) {
-      // long
-      return new Pair<>(
-          TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, 
value.length() - 1))});
-    } else {
-      // double
-      return new Pair<>(TSDataType.DOUBLE, new double[] 
{Double.parseDouble(value)});
-    }
-  }
-
-  private boolean setTimestamp(Matcher matcher, TableMessage message) {
-    String timestampGroup = matcher.group(TIMESTAMP);
-    if (timestampGroup != null && !timestampGroup.isEmpty()) {
-      message.setTimestamp(Long.parseLong(timestampGroup));
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public String getName() {
-    return "line";
-  }
-
-  @Override
-  public String getType() {
-    return PayloadFormatter.TABLE_TYPE;
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
deleted file mode 100644
index 563c6147807..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
-import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.protocol.session.IClientSession;
-import org.apache.iotdb.db.protocol.session.MqttClientSession;
-import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
-import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
-import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
-import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
-import 
org.apache.iotdb.db.queryengine.plan.relational.security.TreeAccessCheckContext;
-import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.utils.CommonUtils;
-import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-
-import io.moquette.interception.AbstractInterceptHandler;
-import io.moquette.interception.messages.InterceptConnectMessage;
-import io.moquette.interception.messages.InterceptDisconnectMessage;
-import io.moquette.interception.messages.InterceptPublishMessage;
-import io.netty.buffer.ByteBuf;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.BitMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.ZoneId;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** PublishHandler handle the messages from MQTT clients. */
-public class MPPPublishHandler extends AbstractInterceptHandler {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MPPPublishHandler.class);
-
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
-  private final SessionManager sessionManager = SessionManager.getInstance();
-
-  private final ConcurrentHashMap<String, MqttClientSession> 
clientIdToSessionMap =
-      new ConcurrentHashMap<>();
-  private final PayloadFormatter payloadFormat;
-  private final IPartitionFetcher partitionFetcher;
-  private final ISchemaFetcher schemaFetcher;
-  private final boolean useTableInsert;
-
-  public MPPPublishHandler(IoTDBConfig config) {
-    this.payloadFormat = 
PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
-    partitionFetcher = ClusterPartitionFetcher.getInstance();
-    schemaFetcher = ClusterSchemaFetcher.getInstance();
-    useTableInsert = 
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
-  }
-
-  @Override
-  public String getID() {
-    return "iotdb-mqtt-broker-listener";
-  }
-
-  @Override
-  public void onConnect(InterceptConnectMessage msg) {
-    if (msg.getClientID() == null || msg.getClientID().trim().isEmpty()) {
-      LOG.error(
-          "Connection refused: client_id is missing or empty. A valid 
client_id is required to establish a connection.");
-    }
-    if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
-      MqttClientSession session = new MqttClientSession(msg.getClientID());
-      sessionManager.login(
-          session,
-          msg.getUsername(),
-          new String(msg.getPassword()),
-          ZoneId.systemDefault().toString(),
-          TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
-          ClientVersion.V_1_0,
-          useTableInsert ? IClientSession.SqlDialect.TABLE : 
IClientSession.SqlDialect.TREE);
-      sessionManager.registerSessionForMqtt(session);
-      clientIdToSessionMap.put(msg.getClientID(), session);
-    }
-  }
-
-  @Override
-  public void onDisconnect(InterceptDisconnectMessage msg) {
-    MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
-    if (null != session) {
-      sessionManager.removeCurrSessionForMqtt(session);
-      sessionManager.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
-    }
-  }
-
-  @Override
-  public void onPublish(InterceptPublishMessage msg) {
-    try {
-      String clientId = msg.getClientID();
-      if (!clientIdToSessionMap.containsKey(clientId)) {
-        return;
-      }
-      MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
-      ByteBuf payload = msg.getPayload();
-      String topic = msg.getTopicName();
-
-      if (LOG.isDebugEnabled()) {
-        String username = msg.getUsername();
-        MqttQoS qos = msg.getQos();
-        LOG.debug(
-            "Receive publish message. clientId: {}, username: {}, qos: {}, 
topic: {}, payload: {}",
-            clientId,
-            username,
-            qos,
-            topic,
-            payload);
-      }
-
-      List<Message> messages = payloadFormat.format(topic, payload);
-      if (messages == null) {
-        return;
-      }
-
-      for (Message message : messages) {
-        if (message == null) {
-          continue;
-        }
-        if (useTableInsert) {
-          insertTable((TableMessage) message, session);
-        } else {
-          insertTree((TreeMessage) message, session);
-        }
-      }
-    } catch (Throwable t) {
-      LOG.warn("onPublish execution exception, msg is [{}], error is ", msg, 
t);
-    } finally {
-      // release the payload of the message
-      super.onPublish(msg);
-    }
-  }
-
-  /** Inserting table using tablet */
-  private void insertTable(TableMessage message, MqttClientSession session) {
-    TSStatus tsStatus = null;
-    try {
-      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
-      InsertTabletStatement insertTabletStatement = 
constructInsertTabletStatement(message);
-      session.setDatabaseName(message.getDatabase().toLowerCase());
-      session.setSqlDialect(IClientSession.SqlDialect.TABLE);
-      long queryId = sessionManager.requestQueryId();
-      SqlParser relationSqlParser = new SqlParser();
-      Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
-      ExecutionResult result =
-          Coordinator.getInstance()
-              .executeForTableModel(
-                  insertTabletStatement,
-                  relationSqlParser,
-                  session,
-                  queryId,
-                  sessionManager.getSessionInfo(session),
-                  "",
-                  metadata,
-                  config.getQueryTimeoutThreshold());
-
-      tsStatus = result.status;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process result: {}", tsStatus);
-      }
-      if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          || tsStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-        LOG.warn("mqtt line insert error , message = {}", tsStatus.message);
-      }
-    } catch (Exception e) {
-      LOG.warn(
-          "meet error when inserting database {}, table {}, tags {}, 
attributes {}, fields {}, at time {}, because ",
-          message.getDatabase(),
-          message.getTable(),
-          message.getTagKeys(),
-          message.getAttributeKeys(),
-          message.getFields(),
-          message.getTimestamp(),
-          e);
-    }
-  }
-
-  private InsertTabletStatement constructInsertTabletStatement(TableMessage 
message) {
-    InsertTabletStatement insertStatement = new InsertTabletStatement();
-    insertStatement.setDevicePath(new PartialPath(message.getTable(), false));
-    List<String> measurements =
-        Stream.of(message.getFields(), message.getTagKeys(), 
message.getAttributeKeys())
-            .flatMap(List::stream)
-            .collect(Collectors.toList());
-    insertStatement.setMeasurements(measurements.toArray(new String[0]));
-    long[] timestamps = new long[] {message.getTimestamp()};
-    insertStatement.setTimes(timestamps);
-    int columnSize = measurements.size();
-    int rowSize = 1;
-
-    BitMap[] bitMaps = new BitMap[columnSize];
-    Object[] columns =
-        Stream.of(message.getValues(), message.getTagValues(), 
message.getAttributeValues())
-            .flatMap(List::stream)
-            .toArray(Object[]::new);
-    insertStatement.setColumns(columns);
-    insertStatement.setBitMaps(bitMaps);
-    insertStatement.setRowCount(rowSize);
-    insertStatement.setAligned(false);
-    insertStatement.setWriteToTable(true);
-    TSDataType[] dataTypes = new TSDataType[measurements.size()];
-    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[measurements.size()];
-    for (int i = 0; i < message.getFields().size(); i++) {
-      dataTypes[i] = message.getDataTypes().get(i);
-      columnCategories[i] = TsTableColumnCategory.FIELD;
-    }
-    for (int i = message.getFields().size();
-        i < message.getFields().size() + message.getTagKeys().size();
-        i++) {
-      dataTypes[i] = TSDataType.STRING;
-      columnCategories[i] = TsTableColumnCategory.TAG;
-    }
-    for (int i = message.getFields().size() + message.getTagKeys().size();
-        i
-            < message.getFields().size()
-                + message.getTagKeys().size()
-                + message.getAttributeKeys().size();
-        i++) {
-      dataTypes[i] = TSDataType.STRING;
-      columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
-    }
-    insertStatement.setDataTypes(dataTypes);
-    insertStatement.setColumnCategories(columnCategories);
-
-    return insertStatement;
-  }
-
-  private void insertTree(TreeMessage message, MqttClientSession session) {
-    TSStatus tsStatus = null;
-    try {
-      InsertRowStatement statement = new InsertRowStatement();
-      statement.setDevicePath(
-          
DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice()));
-      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
-      statement.setTime(message.getTimestamp());
-      statement.setMeasurements(message.getMeasurements().toArray(new 
String[0]));
-      if (message.getDataTypes() == null) {
-        statement.setDataTypes(new 
TSDataType[message.getMeasurements().size()]);
-        statement.setValues(message.getValues().toArray(new Object[0]));
-        statement.setNeedInferType(true);
-      } else {
-        List<TSDataType> dataTypes = message.getDataTypes();
-        List<String> values = message.getValues();
-        Object[] inferredValues = new Object[values.size()];
-        for (int i = 0; i < values.size(); ++i) {
-          inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), 
values.get(i));
-        }
-        statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
-        statement.setValues(inferredValues);
-      }
-      statement.setAligned(false);
-
-      tsStatus =
-          AuthorityChecker.checkAuthority(
-              statement,
-              new TreeAccessCheckContext(
-                  session.getUserId(), session.getUsername(), 
session.getClientAddress()));
-      if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOG.warn(tsStatus.message);
-      } else {
-        long queryId = sessionManager.requestQueryId();
-        ExecutionResult result =
-            Coordinator.getInstance()
-                .executeForTreeModel(
-                    statement,
-                    queryId,
-                    sessionManager.getSessionInfo(session),
-                    "",
-                    partitionFetcher,
-                    schemaFetcher,
-                    config.getQueryTimeoutThreshold(),
-                    false);
-        tsStatus = result.status;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("process result: {}", tsStatus);
-        }
-        if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            || tsStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          LOG.warn("mqtt json insert error , message = {}", tsStatus.message);
-        }
-      }
-    } catch (Exception e) {
-      LOG.warn(
-          "meet error when inserting device {}, measurements {}, at time {}, 
because ",
-          message.getDevice(),
-          message.getMeasurements(),
-          message.getTimestamp(),
-          e);
-    }
-  }
-
-  @Override
-  public void onSessionLoopError(Throwable throwable) {
-    // TODO: Implement something sensible here ...
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
deleted file mode 100644
index ba31d869760..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.mqtt;
-
-/** Generic parsing of messages */
-public class Message {
-
-  protected Long timestamp;
-
-  public Long getTimestamp() {
-    return timestamp;
-  }
-
-  public void setTimestamp(Long timestamp) {
-    this.timestamp = timestamp;
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
deleted file mode 100644
index c0b48539cd7..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import com.google.common.base.Preconditions;
-import org.apache.tsfile.external.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-/** PayloadFormatManager loads payload formatter from SPI services. */
-public class PayloadFormatManager {
-
-  private PayloadFormatManager() {}
-
-  private static final Logger logger = 
LoggerFactory.getLogger(PayloadFormatManager.class);
-
-  // The dir saving MQTT payload plugin .jar files
-  private static String mqttDir;
-  // Map: formatterName => PayloadFormatter
-  private static Map<String, PayloadFormatter> mqttPayloadPluginMap = new 
HashMap<>();
-
-  static {
-    init();
-  }
-
-  private static void init() {
-    mqttDir = IoTDBDescriptor.getInstance().getConfig().getMqttDir();
-    logger.info("mqttDir: {}", mqttDir);
-
-    try {
-      makeMqttPluginDir();
-      buildMqttPluginMap();
-    } catch (IOException e) {
-      logger.error("MQTT PayloadFormatManager init() error.", e);
-    }
-  }
-
-  public static PayloadFormatter getPayloadFormat(String name) {
-    PayloadFormatter formatter = mqttPayloadPluginMap.get(name);
-    Preconditions.checkArgument(formatter != null, "Unknown payload format 
named: " + name);
-    return formatter;
-  }
-
-  private static void makeMqttPluginDir() throws IOException {
-    File file = SystemFileFactory.INSTANCE.getFile(mqttDir);
-    if (file.exists() && file.isDirectory()) {
-      return;
-    }
-    FileUtils.forceMkdir(file);
-  }
-
-  private static void buildMqttPluginMap() throws IOException {
-    ServiceLoader<PayloadFormatter> payloadFormatters = 
ServiceLoader.load(PayloadFormatter.class);
-    for (PayloadFormatter formatter : payloadFormatters) {
-      if (formatter == null) {
-        logger.error("PayloadFormatManager(), formatter is null.");
-        continue;
-      }
-
-      String pluginName = formatter.getName();
-      mqttPayloadPluginMap.put(pluginName, formatter);
-      logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", 
pluginName);
-    }
-
-    URL[] jarURLs = getPluginJarURLs(mqttDir);
-    logger.debug("MQTT Plugin jarURLs: {}", Arrays.toString(jarURLs));
-
-    for (URL jarUrl : jarURLs) {
-      ClassLoader classLoader = new URLClassLoader(new URL[] {jarUrl});
-
-      // Use SPI to get all plugins' class
-      ServiceLoader<PayloadFormatter> payloadFormatters2 =
-          ServiceLoader.load(PayloadFormatter.class, classLoader);
-
-      for (PayloadFormatter formatter : payloadFormatters2) {
-        if (formatter == null) {
-          logger.error("PayloadFormatManager(), formatter is null.");
-          continue;
-        }
-
-        String pluginName = formatter.getName();
-        if (mqttPayloadPluginMap.containsKey(pluginName)) {
-          continue;
-        }
-        mqttPayloadPluginMap.put(pluginName, formatter);
-        logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", 
pluginName);
-      }
-    }
-  }
-
-  /**
-   * get all jar files in the given folder
-   *
-   * @param folderPath
-   * @return all jar files' URL
-   * @throws IOException
-   */
-  private static URL[] getPluginJarURLs(String folderPath) throws IOException {
-    HashSet<File> fileSet =
-        new HashSet<>(
-            org.apache.tsfile.external.commons.io.FileUtils.listFiles(
-                SystemFileFactory.INSTANCE.getFile(folderPath), new String[] 
{"jar"}, true));
-    return 
org.apache.tsfile.external.commons.io.FileUtils.toURLs(fileSet.toArray(new 
File[0]));
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
deleted file mode 100644
index c86648ac161..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iotdb.db.protocol.mqtt;
-
-import io.netty.buffer.ByteBuf;
-
-import java.util.List;
-
-/**
- * PayloadFormatter format the payload to the messages.
- *
- * <p>This is a SPI interface.
- *
- * @see JSONPayloadFormatter
- */
-public interface PayloadFormatter {
-
-  public static final String TREE_TYPE = "tree";
-  public static final String TABLE_TYPE = "table";
-
-  /**
-   * format a payload to a list of messages
-   *
-   * @param payload
-   * @return
-   */
-  @Deprecated
-  List<Message> format(ByteBuf payload);
-
-  /**
-   * format a payload of a topic to a list of messages
-   *
-   * @param topic
-   * @param payload
-   * @return
-   */
-  default List<Message> format(String topic, ByteBuf payload) {
-    return format(payload);
-  }
-
-  /**
-   * get the formatter name
-   *
-   * @return
-   */
-  String getName();
-
-  String getType();
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
deleted file mode 100644
index b8aec19da58..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.tsfile.enums.TSDataType;
-
-import java.util.List;
-
-/** Message parsing into a table */
-public class TableMessage extends Message {
-
-  private String database;
-
-  private String table;
-
-  private List<String> tagKeys;
-
-  private List<Object> tagValues;
-
-  private List<String> attributeKeys;
-
-  private List<Object> attributeValues;
-
-  private List<String> fields;
-
-  private List<TSDataType> dataTypes;
-
-  private List<Object> values;
-
-  public String getDatabase() {
-    return database;
-  }
-
-  public void setDatabase(String database) {
-    this.database = database;
-  }
-
-  public String getTable() {
-    return table;
-  }
-
-  public void setTable(String table) {
-    this.table = table;
-  }
-
-  public List<String> getTagKeys() {
-    return tagKeys;
-  }
-
-  public void setTagKeys(List<String> tagKeys) {
-    this.tagKeys = tagKeys;
-  }
-
-  public List<Object> getTagValues() {
-    return tagValues;
-  }
-
-  public void setTagValues(List<Object> tagValues) {
-    this.tagValues = tagValues;
-  }
-
-  public List<String> getAttributeKeys() {
-    return attributeKeys;
-  }
-
-  public void setAttributeKeys(List<String> attributeKeys) {
-    this.attributeKeys = attributeKeys;
-  }
-
-  public List<Object> getAttributeValues() {
-    return attributeValues;
-  }
-
-  public void setAttributeValues(List<Object> attributeValues) {
-    this.attributeValues = attributeValues;
-  }
-
-  public List<String> getFields() {
-    return fields;
-  }
-
-  public void setFields(List<String> fields) {
-    this.fields = fields;
-  }
-
-  public List<TSDataType> getDataTypes() {
-    return dataTypes;
-  }
-
-  public void setDataTypes(List<TSDataType> dataTypes) {
-    this.dataTypes = dataTypes;
-  }
-
-  public List<Object> getValues() {
-    return values;
-  }
-
-  public void setValues(List<Object> values) {
-    this.values = values;
-  }
-
-  @Override
-  public String toString() {
-    return "TableMessage{"
-        + "database='"
-        + database
-        + '\''
-        + ", table='"
-        + table
-        + '\''
-        + ", tagKeys="
-        + tagKeys
-        + ", tagValues="
-        + tagValues
-        + ", attributeKeys="
-        + attributeKeys
-        + ", attributeValues="
-        + attributeValues
-        + ", fields="
-        + fields
-        + ", dataTypes="
-        + dataTypes
-        + ", values="
-        + values
-        + ", timestamp="
-        + timestamp
-        + '}';
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
deleted file mode 100644
index 9416ea3c838..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.tsfile.enums.TSDataType;
-
-import java.util.List;
-
-/** Message parsing into a tree */
-public class TreeMessage extends Message {
-  private String device;
-  private List<String> measurements;
-  private List<TSDataType> dataTypes;
-  private List<String> values;
-
-  public String getDevice() {
-    return device;
-  }
-
-  public void setDevice(String device) {
-    this.device = device;
-  }
-
-  public List<String> getMeasurements() {
-    return measurements;
-  }
-
-  public void setMeasurements(List<String> measurements) {
-    this.measurements = measurements;
-  }
-
-  public List<TSDataType> getDataTypes() {
-    return dataTypes;
-  }
-
-  public void setDataTypes(List<TSDataType> dataTypes) {
-    this.dataTypes = dataTypes;
-  }
-
-  public List<String> getValues() {
-    return values;
-  }
-
-  public void setValues(List<String> values) {
-    this.values = values;
-  }
-
-  @Override
-  public String toString() {
-    return "Message{"
-        + "device='"
-        + device
-        + '\''
-        + ", timestamp="
-        + super.timestamp
-        + ", measurements="
-        + measurements
-        + ", values="
-        + values
-        + '}';
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
index 21d4121ffd3..cc761d050eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
@@ -62,7 +62,7 @@ public abstract class IClientSession {
 
   abstract TSConnectionType getConnectionType();
 
-  /** ip:port for thrift-based service and client id for mqtt-based service. */
+  /** ip:port for thrift-based service. */
   abstract String getConnectionId();
 
   public void setClientVersion(ClientVersion clientVersion) {
@@ -144,7 +144,6 @@ public abstract class IClientSession {
    * statementIds that this client opens.<br>
    * For JDBC clients, each Statement instance has a statement id.<br>
    * For an IoTDBSession connection, each connection has a statement id.<br>
-   * mqtt clients have no statement id.
    */
   public abstract Iterable<Long> getStatementIds();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java
deleted file mode 100644
index ae9e2cd0361..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.protocol.session;
-
-import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
-
-import java.util.Collections;
-import java.util.Set;
-
-public class MqttClientSession extends IClientSession {
-
-  private final String clientID;
-
-  public MqttClientSession(String clientID) {
-    this.clientID = clientID;
-  }
-
-  @Override
-  public String getClientAddress() {
-    return clientID;
-  }
-
-  @Override
-  public int getClientPort() {
-    return 0;
-  }
-
-  @Override
-  TSConnectionType getConnectionType() {
-    return TSConnectionType.MQTT_BASED;
-  }
-
-  @Override
-  String getConnectionId() {
-    return clientID;
-  }
-
-  @Override
-  public Set<Long> getStatementIds() {
-    return Collections.emptySet();
-  }
-
-  @Override
-  public void addStatementId(long statementId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Set<Long> removeStatementId(long statementId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void addQueryId(Long statementId, long queryId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void removeQueryId(Long statementId, Long queryId) {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index d098cf70223..24dad9549c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -78,7 +78,6 @@ public class SessionManager implements SessionManagerMBean {
 
   private final ThreadLocal<Long> currSessionIdleTime = new ThreadLocal<>();
 
-  // sessions does not contain MqttSessions..
   private final Map<IClientSession, Object> sessions = new 
ConcurrentHashMap<>();
   // used for sessions.
   private final Object placeHolder = new Object();
@@ -413,12 +412,6 @@ public class SessionManager implements SessionManagerMBean 
{
     currSessionIdleTime.remove();
   }
 
-  public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) {
-    if (mqttClientSession != null) {
-      sessions.remove(mqttClientSession);
-    }
-  }
-
   /**
    * this method can be only used in client-thread model. Do not use this 
method in message-thread
    * model based service.
@@ -436,14 +429,6 @@ public class SessionManager implements SessionManagerMBean 
{
     return true;
   }
 
-  /**
-   * this method can be only used in mqtt model. Do not use this method in 
client-thread model based
-   * service.
-   */
-  public void registerSessionForMqtt(IClientSession session) {
-    sessions.put(session, placeHolder);
-  }
-
   /** must be called after registerSession()) will mark the session login. */
   public void supplySession(
       IClientSession session,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index cebf7dab9f3..6eb80b4b372 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -1274,9 +1274,6 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
   }
 
   private void initProtocols() throws StartupException {
-    if (config.isEnableMQTTService()) {
-      registerManager.register(MQTTService.getInstance());
-    }
     if 
(IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
       registerManager.register(RestService.getInstance());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
deleted file mode 100644
index c6cc3fa47ee..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service;
-
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
-import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler;
-
-import io.moquette.BrokerConstants;
-import io.moquette.broker.Server;
-import io.moquette.broker.config.IConfig;
-import io.moquette.broker.config.MemoryConfig;
-import io.moquette.broker.security.IAuthenticator;
-import io.moquette.interception.InterceptHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-/** The IoTDB MQTT Service. */
-public class MQTTService implements IService {
-  private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class);
-  private final Server server = new Server();
-
-  private MQTTService() {}
-
-  @Override
-  public void start() {
-    startup();
-  }
-
-  @Override
-  public void stop() {
-    shutdown();
-  }
-
-  public void startup() {
-    IoTDBConfig iotDBConfig = IoTDBDescriptor.getInstance().getConfig();
-    IConfig config = createBrokerConfig(iotDBConfig);
-    List<InterceptHandler> handlers = new ArrayList<>(1);
-    handlers.add(new MPPPublishHandler(iotDBConfig));
-    IAuthenticator authenticator = new BrokerAuthenticator();
-
-    try {
-      server.startServer(config, handlers, null, authenticator, null);
-    } catch (IOException e) {
-      throw new RuntimeException("Exception while starting server", e);
-    }
-
-    LOG.info(
-        "Start MQTT service successfully, listening on ip {} port {}",
-        iotDBConfig.getMqttHost(),
-        iotDBConfig.getMqttPort());
-
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread(
-                () -> {
-                  LOG.info("Stopping IoTDB MQTT service...");
-                  shutdown();
-                  LOG.info("IoTDB MQTT service stopped.");
-                }));
-  }
-
-  private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) {
-    Properties properties = new Properties();
-    properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, 
iotDBConfig.getMqttHost());
-    properties.setProperty(
-        BrokerConstants.PORT_PROPERTY_NAME, 
String.valueOf(iotDBConfig.getMqttPort()));
-    properties.setProperty(
-        BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE,
-        String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
-    properties.setProperty(
-        BrokerConstants.DATA_PATH_PROPERTY_NAME, 
String.valueOf(iotDBConfig.getMqttDataPath()));
-    
properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, 
"true");
-    properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, 
"false");
-    
properties.setProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, 
"false");
-    properties.setProperty(
-        BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME,
-        String.valueOf(iotDBConfig.getMqttMaxMessageSize()));
-    return new MemoryConfig(properties);
-  }
-
-  public void shutdown() {
-    server.stopServer();
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.MQTT_SERVICE;
-  }
-
-  public static MQTTService getInstance() {
-    return MQTTServiceHolder.INSTANCE;
-  }
-
-  private static class MQTTServiceHolder {
-
-    private static final MQTTService INSTANCE = new MQTTService();
-
-    private MQTTServiceHolder() {}
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
 
b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
index 488d6d02d50..585be9602fc 100644
--- 
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
+++ 
b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
@@ -16,6 +16,3 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
-org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter
-org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
deleted file mode 100644
index 1f66b517e91..00000000000
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class BrokerAuthenticatorTest {
-
-  @Before
-  public void before() {
-    EnvironmentUtils.envSetUp();
-  }
-
-  @After
-  public void after() throws IOException, StorageEngineException {
-    EnvironmentUtils.cleanEnv();
-  }
-
-  @Test
-  public void checkValid() {
-    // In the previous implementation, the datanode will init a root file,
-    // but now checkuser operation needs to link to confignode.
-    //    BrokerAuthenticator authenticator = new BrokerAuthenticator();
-    //    assertTrue(authenticator.checkValid(null, "root", 
"root".getBytes()));
-    //    assertFalse(authenticator.checkValid(null, "", "foo".getBytes()));
-    //    assertFalse(authenticator.checkValid(null, "root", null));
-    //    assertFalse(authenticator.checkValid(null, "foo", "foo".getBytes()));
-  }
-}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
deleted file mode 100644
index deecf607d81..00000000000
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
-
-public class JSONPayloadFormatterTest {
-
-  @Test
-  public void formatJson() {
-    String payload =
-        " {\n"
-            + "      \"device\":\"root.sg.d1\",\n"
-            + "      \"timestamp\":1586076045524,\n"
-            + "      \"measurements\":[\"s1\",\"s2\"],\n"
-            + "      \"values\":[0.530635,0.530635]\n"
-            + " }";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-    String topic = "";
-
-    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0);
-
-    assertEquals("root.sg.d1", message.getDevice());
-    assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
-    assertEquals("s1", message.getMeasurements().get(0));
-    assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0);
-  }
-
-  @Test
-  public void formatBatchJson() {
-    String payload =
-        " {\n"
-            + "      \"device\":\"root.sg.d1\",\n"
-            + "      \"timestamps\":[1586076045524,1586076065526],\n"
-            + "      \"measurements\":[\"s1\",\"s2\"],\n"
-            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
-            + "  }";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-    String topic = "";
-
-    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
-
-    assertEquals("root.sg.d1", message.getDevice());
-    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
-    assertEquals("s2", message.getMeasurements().get(1));
-    assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
-  }
-
-  @Test
-  public void formatJsonArray() {
-    String payload =
-        " [\n"
-            + "  {\n"
-            + "      \"device\":\"root.sg.d1\",\n"
-            + "      \"timestamp\":1586076045524,\n"
-            + "      \"measurements\":[\"s1\",\"s2\"],\n"
-            + "      \"values\":[0.530635,0.530635]\n"
-            + "  },\n"
-            + "  {\n"
-            + "      \"device\":\"root.sg.d2\",\n"
-            + "      \"timestamp\":1586076065526,\n"
-            + "      \"measurements\":[\"s3\",\"s4\"],\n"
-            + "      \"values\":[0.530655,0.530655]\n"
-            + "  }\n"
-            + "]";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-    String topic = "";
-
-    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
-
-    assertEquals("root.sg.d2", message.getDevice());
-    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
-    assertEquals("s3", message.getMeasurements().get(0));
-    assertEquals(0.530655D, Double.parseDouble(message.getValues().get(0)), 0);
-  }
-
-  @Test
-  public void formatBatchJsonArray() {
-    String payload =
-        "[\n"
-            + "  {\n"
-            + "      \"device\":\"root.sg.d1\",\n"
-            + "      \"timestamps\":[1586076045524,1586076065526],\n"
-            + "      \"measurements\":[\"s1\",\"s2\"],\n"
-            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
-            + "  },\n"
-            + "  {\n"
-            + "      \"device\":\"root.sg.d2\",\n"
-            + "      \"timestamps\":[1586076045524,1586076065526],\n"
-            + "      \"measurements\":[\"s3\",\"s4\"],\n"
-            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
-            + "  }"
-            + "]";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-    String topic = "";
-
-    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3);
-
-    assertEquals("root.sg.d2", message.getDevice());
-    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
-    assertEquals("s4", message.getMeasurements().get(1));
-    assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
deleted file mode 100644
index 7bf9bce0702..00000000000
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.tsfile.utils.Binary;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LinePayloadFormatterTest {
-
-  @Test
-  public void formatLine() {
-    String payload =
-        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f
 1";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-    String topic = "";
-
-    LinePayloadFormatter formatter = new LinePayloadFormatter();
-    TableMessage message = (TableMessage) formatter.format(topic, buf).get(0);
-
-    assertEquals("test1", message.getTable());
-    assertEquals(Long.valueOf(1L), message.getTimestamp());
-    assertEquals("tag1", message.getTagKeys().get(0));
-    assertEquals("attr1", message.getAttributeKeys().get(0));
-    assertEquals(
-        "value1",
-        ((Binary[]) 
message.getValues().get(0))[0].getStringValue(StandardCharsets.UTF_8));
-    assertEquals(1L, ((long[]) message.getValues().get(1))[0], 0);
-    assertEquals(2L, ((long[]) message.getValues().get(2))[0], 0);
-    assertEquals(3L, ((int[]) message.getValues().get(3))[0], 0);
-    assertTrue(((boolean[]) message.getValues().get(4))[0]);
-    assertFalse(((boolean[]) message.getValues().get(5))[0]);
-    assertEquals(4d, ((double[]) message.getValues().get(6))[0], 0);
-    assertEquals(5f, ((float[]) message.getValues().get(7))[0], 0);
-  }
-
-  @Test
-  public void formatBatchLine() {
-    String payload =
-        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=1u 1 \n"
-            + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-    String topic = "";
-
-    LinePayloadFormatter formatter = new LinePayloadFormatter();
-    TableMessage message = (TableMessage) formatter.format(topic, buf).get(1);
-
-    assertEquals("test2", message.getTable());
-    assertEquals(Long.valueOf(2L), message.getTimestamp());
-    assertEquals("tag3", message.getTagKeys().get(0));
-    assertEquals("attr3", message.getAttributeKeys().get(0));
-    assertEquals(10, ((int[]) message.getValues().get(2))[0], 0);
-  }
-
-  @Test
-  public void formatLineAnnotation() {
-    String payload =
-        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=1u 1 \n"
-            + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-    String topic = "";
-
-    LinePayloadFormatter formatter = new LinePayloadFormatter();
-    List<Message> message = formatter.format(topic, buf);
-
-    assertEquals(1, message.size());
-  }
-}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
deleted file mode 100644
index 096f5d0d90d..00000000000
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-
-import org.junit.After;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-
-public class PayloadFormatManagerTest {
-  @After
-  public void tearDown() throws Exception {
-    EnvironmentUtils.cleanAllDir();
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void getPayloadFormat() {
-    PayloadFormatManager.getPayloadFormat("txt");
-  }
-
-  @Test
-  public void getDefaultPayloadFormat() {
-    assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
-  }
-}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index ebd4d3b7ae0..479bf2a907a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -136,9 +136,6 @@ public class EnvironmentUtils {
     SchemaEngine.getInstance().clear();
     FlushManager.getInstance().stop();
     
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
-    // We must disable MQTT service as it will cost a lot of time to be 
shutdown, which may slow our
-    // unit tests.
-    IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(false);
 
     // clean cache
     if (memoryConfig.isMetaDataCacheEnable()) {
@@ -240,8 +237,6 @@ public class EnvironmentUtils {
     cleanDir(config.getExtPipeDir());
     // delete ext
     cleanDir(config.getExtDir());
-    // delete mqtt dir
-    cleanDir(config.getMqttDir());
     // delete wal
     for (String walDir : commonConfig.getWalDirs()) {
       cleanDir(walDir);
diff --git a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml 
b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml
index 2443eb3313f..2522d9b65f5 100644
--- a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml
+++ b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml
@@ -230,7 +230,6 @@
         <appender-ref ref="FILEALL"/>
         <appender-ref ref="stdout"/>
     </root>
-    <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/>
     <logger level="info" name="org.apache.iotdb.db.service"/>
     <logger level="info" name="org.apache.iotdb.db.conf"/>
     <logger level="info" name="org.apache.iotdb.db.cost.statistic">
diff --git a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml 
b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml
index 2443eb3313f..2522d9b65f5 100644
--- a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml
+++ b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml
@@ -230,7 +230,6 @@
         <appender-ref ref="FILEALL"/>
         <appender-ref ref="stdout"/>
     </root>
-    <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/>
     <logger level="info" name="org.apache.iotdb.db.service"/>
     <logger level="info" name="org.apache.iotdb.db.conf"/>
     <logger level="info" name="org.apache.iotdb.db.cost.statistic">
diff --git a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml 
b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml
index 2443eb3313f..2522d9b65f5 100644
--- a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml
+++ b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml
@@ -230,7 +230,6 @@
         <appender-ref ref="FILEALL"/>
         <appender-ref ref="stdout"/>
     </root>
-    <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/>
     <logger level="info" name="org.apache.iotdb.db.service"/>
     <logger level="info" name="org.apache.iotdb.db.conf"/>
     <logger level="info" name="org.apache.iotdb.db.cost.statistic">
diff --git a/iotdb-core/datanode/src/test/resources/logback-test.xml 
b/iotdb-core/datanode/src/test/resources/logback-test.xml
index f6c690d6ff7..96f2de206cf 100644
--- a/iotdb-core/datanode/src/test/resources/logback-test.xml
+++ b/iotdb-core/datanode/src/test/resources/logback-test.xml
@@ -54,9 +54,7 @@
     <logger name="org.apache.iotdb.commons.service.RegisterManager" 
level="INFO"/>
     <logger name="org.apache.iotdb.db.service.DataNode" level="WARN"/>
     <logger name="org.apache.iotdb.db.service.ExternalRPCService" 
level="INFO"/>
-    <logger name="org.apache.iotdb.db.service.MQTTService" level="INFO"/>
     <logger 
name="org.apache.iotdb.db.storageengine.compaction.cross.rewrite.task" 
level="ERROR"/>
-    <logger name="io.moquette.broker.metrics.MQTTMessageLogger" level="ERROR"/>
     <logger name="DETAILED_FAILURE_QUERY_TRACE" level="ERROR"/>
     <logger name="org.apache.iotdb.db.storageengine.dataregion.DataRegion" 
level="INFO"/>
     <root level="ERROR">
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 6c0d71addf3..f3f7f6a5185 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -2043,42 +2043,6 @@ procedure_completed_clean_interval=30
 # Datatype: int
 procedure_completed_evict_ttl=60
 
-####################
-### MQTT Broker Configuration
-####################
-
-# whether to enable the mqtt service.
-# effectiveMode: restart
-# Datatype: boolean
-enable_mqtt_service=false
-
-# the mqtt service binding host.
-# effectiveMode: restart
-# Datatype: String
-mqtt_host=127.0.0.1
-
-# the mqtt service binding port.
-# effectiveMode: restart
-# Datatype: int
-mqtt_port=1883
-
-# the handler pool size for handing the mqtt messages.
-# effectiveMode: restart
-# Datatype: int
-mqtt_handler_pool_size=1
-
-# the mqtt message payload formatter.
-# effectiveMode: restart
-# Options: [json, line]
-# The built-in json only supports tree models, and the line only supports 
table models.
-# Datatype: String
-mqtt_payload_formatter=json
-
-# max length of mqtt message in byte
-# effectiveMode: restart
-# Datatype: int
-mqtt_max_message_size=1048576
-
 ####################
 ### IoTDB-AI Configuration
 ####################
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 4a54d774c94..7c3b95258a1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -270,19 +270,9 @@ public class IoTDBConstant {
   public static final String TMP_FOLDER_NAME = "tmp";
   public static final String DELETION_FOLDER_NAME = "deletion";
 
-  public static final String MQTT_FOLDER_NAME = "mqtt";
   public static final String WAL_FOLDER_NAME = "wal";
   public static final String EXT_PIPE_FOLDER_NAME = "extPipe";
 
-  // mqtt
-  public static final String ENABLE_MQTT = "enable_mqtt_service";
-  public static final String MQTT_HOST_NAME = "mqtt_host";
-  public static final String MQTT_PORT_NAME = "mqtt_port";
-  public static final String MQTT_HANDLER_POOL_SIZE_NAME = 
"mqtt_handler_pool_size";
-  public static final String MQTT_PAYLOAD_FORMATTER_NAME = 
"mqtt_payload_formatter";
-  public static final String MQTT_DATA_PATH = "mqtt_data_path";
-  public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size";
-
   // thrift
   public static final int LEFT_SIZE_IN_REQUEST = 4 * 1024 * 1024;
   public static final int DEFAULT_FETCH_SIZE = 5000;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 7267c79a665..55fad14e5a1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -25,7 +25,6 @@ public enum ServiceType {
   METRIC_SERVICE("Metrics ServerService", "MetricService"),
   RPC_SERVICE("RPC ServerService", "RPCService"),
   INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
-  MQTT_SERVICE("MQTTService", "MqttService"),
   AIR_GAP_SERVICE("AirGapService", "AirGapService"),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
   STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"),
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 53d55ce54ca..5a1e76fe5f5 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -527,7 +527,7 @@ enum TSConnectionType {
 struct TSConnectionInfo {
   1: required string userName
   2: required i64 logInTime
-  3: required string connectionId // ip:port for thrift-based service and 
clientId for mqtt-based service
+  3: required string connectionId // ip:port for thrift-based service
   4: required TSConnectionType type
 }
 
diff --git a/pom.xml b/pom.xml
index 4461cd45f5c..521c84719d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,6 @@
         <enforcer.skip>true</enforcer.skip>
         <felix.version>5.1.9</felix.version>
         <findbugs.jsr305.version>3.0.2</findbugs.jsr305.version>
-        <fusesource-mqtt-client.version>1.16</fusesource-mqtt-client.version>
         <!-- JDK1.8 only support google java format 1.7-->
         <google.java.format.version>1.22.0</google.java.format.version>
         <gson.version>2.13.1</gson.version>
@@ -523,11 +522,6 @@
                     </exclusion>
                 </exclusions>
             </dependency>
-            <dependency>
-                <groupId>org.fusesource.mqtt-client</groupId>
-                <artifactId>mqtt-client</artifactId>
-                <version>${fusesource-mqtt-client.version}</version>
-            </dependency>
             <dependency>
                 <groupId>org.eclipse.collections</groupId>
                 <artifactId>eclipse-collections</artifactId>


Reply via email to