This is an automated email from the ASF dual-hosted git repository.

weihao pushed a commit to branch splitMqtt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d2f1c123dccaa744a107f825218d7b5471db5edb
Author: Weihao Li <[email protected]>
AuthorDate: Fri Jan 16 11:17:18 2026 +0800

    draft
    
    Signed-off-by: Weihao Li <[email protected]>
---
 distribution/src/assembly/all.xml                  |   1 +
 distribution/src/assembly/datanode.xml             |   1 +
 example/mqtt-customize/pom.xml                     |   2 +-
 ...Formatter => org.apache.iotdb.PayloadFormatter} |   0
 example/pom.xml                                    |   2 +-
 external-service-impl/mqtt/pom.xml                 | 115 +++++++++++++++++++++
 .../apache/iotdb}/mqtt/BrokerAuthenticator.java    |   2 +-
 .../apache/iotdb}/mqtt/JSONPayloadFormatter.java   |   2 +-
 .../apache/iotdb}/mqtt/LinePayloadFormatter.java   |   2 +-
 .../org/apache/iotdb}/mqtt/MPPPublishHandler.java  |   2 +-
 .../java/org/apache/iotdb/mqtt}/MQTTService.java   |  28 +----
 .../main/java/org/apache/iotdb}/mqtt/Message.java  |   2 +-
 .../apache/iotdb}/mqtt/PayloadFormatManager.java   |   2 +-
 .../org/apache/iotdb}/mqtt/PayloadFormatter.java   |   2 +-
 .../java/org/apache/iotdb}/mqtt/TableMessage.java  |   2 +-
 .../java/org/apache/iotdb}/mqtt/TreeMessage.java   |   2 +-
 .../org.apache.iotdb.mqtt.PayloadFormatter         |   4 +-
 .../iotdb}/mqtt/BrokerAuthenticatorTest.java       |  18 +---
 .../iotdb}/mqtt/JSONPayloadFormatterTest.java      |   3 +-
 .../iotdb}/mqtt/LinePayloadFormatterTest.java      |   3 +-
 .../iotdb}/mqtt/PayloadFormatManagerTest.java      |   8 +-
 .../pom.xml                                        |  27 ++++-
 iotdb-core/datanode/pom.xml                        |   4 -
 .../java/org/apache/iotdb/db/service/DataNode.java |  16 ++-
 .../externalservice/BuiltinExternalServices.java   |   8 +-
 .../ExternalServiceManagementService.java          |  54 ++++++++--
 .../apache/iotdb/commons/service/ServiceType.java  |   1 -
 pom.xml                                            |   1 +
 28 files changed, 219 insertions(+), 95 deletions(-)

diff --git a/distribution/src/assembly/all.xml 
b/distribution/src/assembly/all.xml
index b77f71a32dc..e28fc04a9ee 100644
--- a/distribution/src/assembly/all.xml
+++ b/distribution/src/assembly/all.xml
@@ -32,6 +32,7 @@
                 <include>*:iotdb-server:zip:*</include>
                 <include>*:iotdb-cli:zip:*</include>
                 <include>*:iotdb-confignode:zip:*</include>
+                <include>*:external-service-impl:zip:*</include>
             </includes>
             <outputDirectory>${file.separator}</outputDirectory>
             
<outputFileNameMapping>${artifact.artifactId}.${artifact.extension}</outputFileNameMapping>
diff --git a/distribution/src/assembly/datanode.xml 
b/distribution/src/assembly/datanode.xml
index 016059a903a..5393568e4cf 100644
--- a/distribution/src/assembly/datanode.xml
+++ b/distribution/src/assembly/datanode.xml
@@ -31,6 +31,7 @@
             <includes>
                 <include>*:iotdb-server:zip:*</include>
                 <include>*:iotdb-cli:zip:*</include>
+                <include>*:external-service-impl</include>
             </includes>
             <outputDirectory>${file.separator}</outputDirectory>
             
<outputFileNameMapping>${artifact.artifactId}.${artifact.extension}</outputFileNameMapping>
diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
index 2624571f4e8..c6857480264 100644
--- a/example/mqtt-customize/pom.xml
+++ b/example/mqtt-customize/pom.xml
@@ -32,7 +32,7 @@
         <!-- used by the server-->
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
+            <artifactId>iotdb-mqtt</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
diff --git 
a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
 
b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter
similarity index 100%
rename from 
example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
rename to 
example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter
diff --git a/example/pom.xml b/example/pom.xml
index 9af648e0d26..5b6bac69a0e 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -32,7 +32,7 @@
     <modules>
         <module>jdbc</module>
         <module>mqtt</module>
-        <module>mqtt-customize</module>
+        <!-- //TODO -->
         <module>pipe-count-point-processor</module>
         <module>pipe-opc-ua-sink</module>
         <module>rest-java-example</module>
diff --git a/external-service-impl/mqtt/pom.xml 
b/external-service-impl/mqtt/pom.xml
new file mode 100644
index 00000000000..3e08d1e7ff0
--- /dev/null
+++ b/external-service-impl/mqtt/pom.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>external-service-impl</artifactId>
+        <version>2.0.7-SNAPSHOT</version>
+    </parent>
+    <artifactId>mqtt</artifactId>
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>com.github.moquette-io.moquette</groupId>
+            <artifactId>moquette-broker</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-thrift-commons</artifactId>
+            <version>2.0.7-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tsfile</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${tsfile.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-thrift</artifactId>
+            <version>2.0.7-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>node-commons</artifactId>
+            <version>2.0.7-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>service-rpc</artifactId>
+            <version>2.0.7-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-mqtt</artifactId>
+        </dependency>
+    </dependencies>
+    <profiles>
+        <profile>
+            <id>get-jar-with-dependencies</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <descriptorRefs>
+                                
<descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <!-- bind to the packaging phase -->
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                                <!-- this is used for inheritance merges -->
+                                <phase>package</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java
index a05c8264b82..14647a8582f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.rpc.TSStatusCode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java
index cc857b7295c..7a348c850b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java
index 8b596ee2c89..f80c3eb0b5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.tsfile.enums.TSDataType;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java
index a0c0e1a709b..37182505c58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java
similarity index 84%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java
index c6cc3fa47ee..3b2be5ac3f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java
@@ -16,14 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.service;
 
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
+package org.apache.iotdb.mqtt;
+
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
-import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler;
+import org.apache.iotdb.externalservice.api.IExternalService;
 
 import io.moquette.BrokerConstants;
 import io.moquette.broker.Server;
@@ -40,12 +38,10 @@ import java.util.List;
 import java.util.Properties;
 
 /** The IoTDB MQTT Service. */
-public class MQTTService implements IService {
+public class MQTTService implements IExternalService {
   private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class);
   private final Server server = new Server();
 
-  private MQTTService() {}
-
   @Override
   public void start() {
     startup();
@@ -106,20 +102,4 @@ public class MQTTService implements IService {
   public void shutdown() {
     server.stopServer();
   }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.MQTT_SERVICE;
-  }
-
-  public static MQTTService getInstance() {
-    return MQTTServiceHolder.INSTANCE;
-  }
-
-  private static class MQTTServiceHolder {
-
-    private static final MQTTService INSTANCE = new MQTTService();
-
-    private MQTTServiceHolder() {}
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
 b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java
index ba31d869760..9a431175453 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 /** Generic parsing of messages */
 public class Message {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java
similarity index 99%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java
index c0b48539cd7..7bb051a8526 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java
index c86648ac161..672c512d54a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import io.netty.buffer.ByteBuf;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java
index b8aec19da58..5c8fcc5d708 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import org.apache.tsfile.enums.TSDataType;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
rename to 
external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java
index 9416ea3c838..07ff8ed0b3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
+++ 
b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
 
 import org.apache.tsfile.enums.TSDataType;
 
diff --git 
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
 
b/external-service-impl/mqtt/src/main/resources/META-INF.services/org.apache.iotdb.mqtt.PayloadFormatter
similarity index 87%
rename from 
iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
rename to 
external-service-impl/mqtt/src/main/resources/META-INF.services/org.apache.iotdb.mqtt.PayloadFormatter
index 488d6d02d50..f42aa83ff86 100644
--- 
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
+++ 
b/external-service-impl/mqtt/src/main/resources/META-INF.services/org.apache.iotdb.mqtt.PayloadFormatter
@@ -17,5 +17,5 @@
 # under the License.
 #
 
-org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter
-org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter
+org.apache.iotdb.mqtt.JSONPayloadFormatter
+org.apache.iotdb.mqtt.LinePayloadFormatter
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java
similarity index 77%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
rename to 
external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java
index 1f66b517e91..6884a644fed 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
+++ 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java
@@ -15,29 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.protocol.mqtt;
 
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
+package org.apache.iotdb.mqtt;
 
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-
 public class BrokerAuthenticatorTest {
 
-  @Before
-  public void before() {
-    EnvironmentUtils.envSetUp();
-  }
-
-  @After
-  public void after() throws IOException, StorageEngineException {
-    EnvironmentUtils.cleanEnv();
-  }
-
   @Test
   public void checkValid() {
     // In the previous implementation, the datanode will init a root file,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java
similarity index 99%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
rename to 
external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java
index deecf607d81..eab4b42243d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++ 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.protocol.mqtt;
+
+package org.apache.iotdb.mqtt;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java
similarity index 98%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
rename to 
external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java
index 7bf9bce0702..16ed807c9ae 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
+++ 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.protocol.mqtt;
+
+package org.apache.iotdb.mqtt;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java
similarity index 84%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
rename to 
external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java
index 096f5d0d90d..afbe88cf047 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
+++ 
b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java
@@ -15,20 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.iotdb.db.protocol.mqtt;
 
-import org.apache.iotdb.db.utils.EnvironmentUtils;
+package org.apache.iotdb.mqtt;
 
-import org.junit.After;
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;
 
 public class PayloadFormatManagerTest {
-  @After
-  public void tearDown() throws Exception {
-    EnvironmentUtils.cleanAllDir();
-  }
 
   @Test(expected = IllegalArgumentException.class)
   public void getPayloadFormat() {
diff --git a/example/mqtt-customize/pom.xml b/external-service-impl/pom.xml
similarity index 63%
copy from example/mqtt-customize/pom.xml
copy to external-service-impl/pom.xml
index 2624571f4e8..2eeaec2c014 100644
--- a/example/mqtt-customize/pom.xml
+++ b/external-service-impl/pom.xml
@@ -23,17 +23,34 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.iotdb</groupId>
-        <artifactId>iotdb-examples</artifactId>
+        <artifactId>iotdb-parent</artifactId>
         <version>2.0.7-SNAPSHOT</version>
     </parent>
-    <artifactId>customize-mqtt-example</artifactId>
-    <name>IoTDB: Example: Customized MQTT</name>
+    <artifactId>external-service-impl</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>mqtt</module>
+    </modules>
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
     <dependencies>
-        <!-- used by the server-->
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>external-service-api</artifactId>
+            <version>2.0.7-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>iotdb-server</artifactId>
-            <version>${project.version}</version>
+            <version>2.0.7-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 72c4acba586..af2c714b0ea 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -267,10 +267,6 @@
             <groupId>org.glassfish.jersey.containers</groupId>
             <artifactId>jersey-container-servlet-core</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.github.moquette-io.moquette</groupId>
-            <artifactId>moquette-broker</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 27a5ce3b7b7..d0eef468d4f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -1190,14 +1190,14 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
 
   private void prepareExternalServiceResources() throws StartupException {
     long startTime = System.currentTimeMillis();
-    if (resourcesInformationHolder.getExternalServiceEntryList() == null
-        || resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) 
{
-      return;
-    }
 
     try {
-      ExternalServiceManagementService.getInstance()
-          
.restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList());
+      if (resourcesInformationHolder.getExternalServiceEntryList() != null
+          && 
!resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) {
+        ExternalServiceManagementService.getInstance()
+            
.restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList());
+      }
+
       
ExternalServiceManagementService.getInstance().restoreRunningServiceInstance();
     } catch (Exception e) {
       throw new StartupException(e);
@@ -1290,6 +1290,7 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
   public void stop() {
     stopTriggerRelatedServices();
     registerManager.deregisterAll();
+    ExternalServiceManagementService.getInstance().stopRunningServices();
     JMXService.deregisterMBean(mbeanName);
     MetricService.getInstance().stop();
     if (schemaRegionConsensusStarted) {
@@ -1310,9 +1311,6 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
   }
 
   private void initProtocols() throws StartupException {
-    if (config.isEnableMQTTService()) {
-      registerManager.register(MQTTService.getInstance());
-    }
     if 
(IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
       registerManager.register(RestService.getInstance());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
index 7bed3ffe471..dfe90cb3ca1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
@@ -22,14 +22,10 @@ package org.apache.iotdb.db.service.externalservice;
 import java.util.function.Supplier;
 
 public enum BuiltinExternalServices {
-  MQTT(
-      "MQTT",
-      "org.apache.iotdb.externalservice.Mqtt",
-      // IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService
-      () -> false),
+  MQTT("MQTT", "org.apache.iotdb.mqtt.MQTTService", () -> true),
   REST(
       "REST",
-      "org.apache.iotdb.externalservice.Rest",
+      "org.apache.iotdb.mqtt.RestService",
       // 
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService
       () -> false);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
index 71f0df341a6..234d81d82c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -65,10 +66,25 @@ public class ExternalServiceManagementService {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(ExternalServiceManagementService.class);
 
+  public static final String INSTANCE_NULL_ERROR_MSG =
+      "External Service instance is null when state is RUNNING!";
+
   private ExternalServiceManagementService(String libRoot) {
     this.serviceInfos = new HashMap<>();
     restoreBuiltInServices();
     this.libRoot = libRoot;
+    makDir(libRoot);
+  }
+
+  private static void makDir(String dir) {
+    try {
+      SystemFileFactory.INSTANCE.makeDirIfNecessary(dir);
+    } catch (IOException e) {
+      LOGGER.error("Failed to make external service dir", e);
+      throw new ExternalServiceManagementException(
+          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+              .setMessage(e.getMessage()));
+    }
   }
 
   public Iterator<TExternalServiceEntry> showService(int dataNodeId)
@@ -170,7 +186,8 @@ public class ExternalServiceManagementService {
 
   private IExternalService createExternalServiceInstance(String serviceName, 
String className) {
     // close ClassLoader automatically to release the file handle
-    try (ExternalServiceClassLoader classLoader = new 
ExternalServiceClassLoader(libRoot); ) {
+    try {
+      ExternalServiceClassLoader classLoader = new 
ExternalServiceClassLoader(libRoot);
       return (IExternalService)
           Class.forName(className, true, 
classLoader).getDeclaredConstructor().newInstance();
     } catch (IOException
@@ -235,7 +252,7 @@ public class ExternalServiceManagementService {
   private void stopService(ServiceInfo serviceInfo) {
     checkState(
         serviceInfo.getServiceInstance() != null,
-        "External Service instance is null when state is RUNNING!",
+        INSTANCE_NULL_ERROR_MSG,
         serviceInfo.getServiceName());
     serviceInfo.getServiceInstance().stop();
   }
@@ -311,11 +328,34 @@ public class ExternalServiceManagementService {
             serviceInfo -> {
               // start services with RUNNING state
               if (serviceInfo.getState() == RUNNING) {
-                IExternalService serviceInstance =
-                    createExternalServiceInstance(
-                        serviceInfo.getServiceName(), 
serviceInfo.getClassName());
-                serviceInfo.setServiceInstance(serviceInstance);
-                serviceInstance.start();
+
+                try {
+                  IExternalService serviceInstance =
+                      createExternalServiceInstance(
+                          serviceInfo.getServiceName(), 
serviceInfo.getClassName());
+                  serviceInfo.setServiceInstance(serviceInstance);
+                } finally {
+                  // set STOPPED to avoid the case: service is RUNNING, but 
its instance is null
+                  if (serviceInfo.getServiceInstance() == null) {
+                    serviceInfo.setState(STOPPED);
+                  }
+                }
+
+                serviceInfo.getServiceInstance().start();
+              }
+            });
+  }
+
+  public void stopRunningServices() {
+    serviceInfos
+        .values()
+        .forEach(
+            serviceInfo -> {
+              // start services with RUNNING state
+              if (serviceInfo.getState() == RUNNING) {
+                IExternalService serviceInstance = 
serviceInfo.getServiceInstance();
+                checkState(serviceInstance != null, INSTANCE_NULL_ERROR_MSG);
+                serviceInstance.stop();
               }
             });
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 7267c79a665..55fad14e5a1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -25,7 +25,6 @@ public enum ServiceType {
   METRIC_SERVICE("Metrics ServerService", "MetricService"),
   RPC_SERVICE("RPC ServerService", "RPCService"),
   INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
-  MQTT_SERVICE("MQTTService", "MqttService"),
   AIR_GAP_SERVICE("AirGapService", "AirGapService"),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
   STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"),
diff --git a/pom.xml b/pom.xml
index d6261e70af1..4a80bacb188 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@
         <module>distribution</module>
         <module>example</module>
         <module>library-udf</module>
+        <module>external-service-impl</module>
     </modules>
     <properties>
         <!-- This was the last version to support Java 8 -->


Reply via email to