This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch splitMqtt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d2f1c123dccaa744a107f825218d7b5471db5edb Author: Weihao Li <[email protected]> AuthorDate: Fri Jan 16 11:17:18 2026 +0800 draft Signed-off-by: Weihao Li <[email protected]> --- distribution/src/assembly/all.xml | 1 + distribution/src/assembly/datanode.xml | 1 + example/mqtt-customize/pom.xml | 2 +- ...Formatter => org.apache.iotdb.PayloadFormatter} | 0 example/pom.xml | 2 +- external-service-impl/mqtt/pom.xml | 115 +++++++++++++++++++++ .../apache/iotdb}/mqtt/BrokerAuthenticator.java | 2 +- .../apache/iotdb}/mqtt/JSONPayloadFormatter.java | 2 +- .../apache/iotdb}/mqtt/LinePayloadFormatter.java | 2 +- .../org/apache/iotdb}/mqtt/MPPPublishHandler.java | 2 +- .../java/org/apache/iotdb/mqtt}/MQTTService.java | 28 +---- .../main/java/org/apache/iotdb}/mqtt/Message.java | 2 +- .../apache/iotdb}/mqtt/PayloadFormatManager.java | 2 +- .../org/apache/iotdb}/mqtt/PayloadFormatter.java | 2 +- .../java/org/apache/iotdb}/mqtt/TableMessage.java | 2 +- .../java/org/apache/iotdb}/mqtt/TreeMessage.java | 2 +- .../org.apache.iotdb.mqtt.PayloadFormatter | 4 +- .../iotdb}/mqtt/BrokerAuthenticatorTest.java | 18 +--- .../iotdb}/mqtt/JSONPayloadFormatterTest.java | 3 +- .../iotdb}/mqtt/LinePayloadFormatterTest.java | 3 +- .../iotdb}/mqtt/PayloadFormatManagerTest.java | 8 +- .../pom.xml | 27 ++++- iotdb-core/datanode/pom.xml | 4 - .../java/org/apache/iotdb/db/service/DataNode.java | 16 ++- .../externalservice/BuiltinExternalServices.java | 8 +- .../ExternalServiceManagementService.java | 54 ++++++++-- .../apache/iotdb/commons/service/ServiceType.java | 1 - pom.xml | 1 + 28 files changed, 219 insertions(+), 95 deletions(-) diff --git a/distribution/src/assembly/all.xml b/distribution/src/assembly/all.xml index b77f71a32dc..e28fc04a9ee 100644 --- a/distribution/src/assembly/all.xml +++ b/distribution/src/assembly/all.xml @@ -32,6 +32,7 @@ <include>*:iotdb-server:zip:*</include> <include>*:iotdb-cli:zip:*</include> <include>*:iotdb-confignode:zip:*</include> + <include>*:external-service-impl:zip:*</include> </includes> <outputDirectory>${file.separator}</outputDirectory> <outputFileNameMapping>${artifact.artifactId}.${artifact.extension}</outputFileNameMapping> diff --git a/distribution/src/assembly/datanode.xml b/distribution/src/assembly/datanode.xml index 016059a903a..5393568e4cf 100644 --- a/distribution/src/assembly/datanode.xml +++ b/distribution/src/assembly/datanode.xml @@ -31,6 +31,7 @@ <includes> <include>*:iotdb-server:zip:*</include> <include>*:iotdb-cli:zip:*</include> + <include>*:external-service-impl</include> </includes> <outputDirectory>${file.separator}</outputDirectory> <outputFileNameMapping>${artifact.artifactId}.${artifact.extension}</outputFileNameMapping> diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml index 2624571f4e8..c6857480264 100644 --- a/example/mqtt-customize/pom.xml +++ b/example/mqtt-customize/pom.xml @@ -32,7 +32,7 @@ <!-- used by the server--> <dependency> <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-server</artifactId> + <artifactId>iotdb-mqtt</artifactId> <version>${project.version}</version> </dependency> </dependencies> diff --git a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter similarity index 100% rename from example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter rename to example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter diff --git a/example/pom.xml b/example/pom.xml index 9af648e0d26..5b6bac69a0e 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -32,7 +32,7 @@ <modules> <module>jdbc</module> <module>mqtt</module> - <module>mqtt-customize</module> + <!-- //TODO --> <module>pipe-count-point-processor</module> <module>pipe-opc-ua-sink</module> <module>rest-java-example</module> diff --git a/external-service-impl/mqtt/pom.xml b/external-service-impl/mqtt/pom.xml new file mode 100644 index 00000000000..3e08d1e7ff0 --- /dev/null +++ b/external-service-impl/mqtt/pom.xml @@ -0,0 +1,115 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.iotdb</groupId> + <artifactId>external-service-impl</artifactId> + <version>2.0.7-SNAPSHOT</version> + </parent> + <artifactId>mqtt</artifactId> + <properties> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <dependencies> + <dependency> + <groupId>com.github.moquette-io.moquette</groupId> + <artifactId>moquette-broker</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-thrift-commons</artifactId> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.tsfile</groupId> + <artifactId>tsfile</artifactId> + <version>${tsfile.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-thrift</artifactId> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>node-commons</artifactId> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>service-rpc</artifactId> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-mqtt</artifactId> + </dependency> + </dependencies> + <profiles> + <profile> + <id>get-jar-with-dependencies</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + <!-- this is used for inheritance merges --> + <phase>package</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java index a05c8264b82..14647a8582f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java index cc857b7295c..7a348c850b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import com.google.common.collect.Lists; import com.google.gson.Gson; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java index 8b596ee2c89..f80c3eb0b5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; import org.apache.tsfile.enums.TSDataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java index a0c0e1a709b..37182505c58 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java similarity index 84% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java index c6cc3fa47ee..3b2be5ac3f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java @@ -16,14 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.service; -import org.apache.iotdb.commons.service.IService; -import org.apache.iotdb.commons.service.ServiceType; +package org.apache.iotdb.mqtt; + import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator; -import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler; +import org.apache.iotdb.externalservice.api.IExternalService; import io.moquette.BrokerConstants; import io.moquette.broker.Server; @@ -40,12 +38,10 @@ import java.util.List; import java.util.Properties; /** The IoTDB MQTT Service. */ -public class MQTTService implements IService { +public class MQTTService implements IExternalService { private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class); private final Server server = new Server(); - private MQTTService() {} - @Override public void start() { startup(); @@ -106,20 +102,4 @@ public class MQTTService implements IService { public void shutdown() { server.stopServer(); } - - @Override - public ServiceType getID() { - return ServiceType.MQTT_SERVICE; - } - - public static MQTTService getInstance() { - return MQTTServiceHolder.INSTANCE; - } - - private static class MQTTServiceHolder { - - private static final MQTTService INSTANCE = new MQTTService(); - - private MQTTServiceHolder() {} - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java similarity index 96% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java index ba31d869760..9a431175453 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; /** Generic parsing of messages */ public class Message { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java similarity index 99% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java index c0b48539cd7..7bb051a8526 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.db.conf.IoTDBDescriptor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java index c86648ac161..672c512d54a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java similarity index 98% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java index b8aec19da58..5c8fcc5d708 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.tsfile.enums.TSDataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java similarity index 97% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java index 9416ea3c838..07ff8ed0b3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java +++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; +package org.apache.iotdb.mqtt; import org.apache.tsfile.enums.TSDataType; diff --git a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/external-service-impl/mqtt/src/main/resources/META-INF.services/org.apache.iotdb.mqtt.PayloadFormatter similarity index 87% rename from iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter rename to external-service-impl/mqtt/src/main/resources/META-INF.services/org.apache.iotdb.mqtt.PayloadFormatter index 488d6d02d50..f42aa83ff86 100644 --- a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter +++ b/external-service-impl/mqtt/src/main/resources/META-INF.services/org.apache.iotdb.mqtt.PayloadFormatter @@ -17,5 +17,5 @@ # under the License. # -org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter -org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter +org.apache.iotdb.mqtt.JSONPayloadFormatter +org.apache.iotdb.mqtt.LinePayloadFormatter diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java similarity index 77% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java index 1f66b517e91..6884a644fed 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java @@ -15,29 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; -import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.utils.EnvironmentUtils; +package org.apache.iotdb.mqtt; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import java.io.IOException; - public class BrokerAuthenticatorTest { - @Before - public void before() { - EnvironmentUtils.envSetUp(); - } - - @After - public void after() throws IOException, StorageEngineException { - EnvironmentUtils.cleanEnv(); - } - @Test public void checkValid() { // In the previous implementation, the datanode will init a root file, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java similarity index 99% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java index deecf607d81..eab4b42243d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; + +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java similarity index 98% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java index 7bf9bce0702..16ed807c9ae 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; + +package org.apache.iotdb.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java similarity index 84% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java index 096f5d0d90d..afbe88cf047 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java +++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java @@ -15,20 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.iotdb.db.protocol.mqtt; -import org.apache.iotdb.db.utils.EnvironmentUtils; +package org.apache.iotdb.mqtt; -import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertNotNull; public class PayloadFormatManagerTest { - @After - public void tearDown() throws Exception { - EnvironmentUtils.cleanAllDir(); - } @Test(expected = IllegalArgumentException.class) public void getPayloadFormat() { diff --git a/example/mqtt-customize/pom.xml b/external-service-impl/pom.xml similarity index 63% copy from example/mqtt-customize/pom.xml copy to external-service-impl/pom.xml index 2624571f4e8..2eeaec2c014 100644 --- a/example/mqtt-customize/pom.xml +++ b/external-service-impl/pom.xml @@ -23,17 +23,34 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-examples</artifactId> + <artifactId>iotdb-parent</artifactId> <version>2.0.7-SNAPSHOT</version> </parent> - <artifactId>customize-mqtt-example</artifactId> - <name>IoTDB: Example: Customized MQTT</name> + <artifactId>external-service-impl</artifactId> + <packaging>pom</packaging> + <modules> + <module>mqtt</module> + </modules> + <properties> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> <dependencies> - <!-- used by the server--> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>external-service-api</artifactId> + <version>2.0.7-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-server</artifactId> - <version>${project.version}</version> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> </dependency> </dependencies> </project> diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 72c4acba586..af2c714b0ea 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -267,10 +267,6 @@ <groupId>org.glassfish.jersey.containers</groupId> <artifactId>jersey-container-servlet-core</artifactId> </dependency> - <dependency> - <groupId>com.github.moquette-io.moquette</groupId> - <artifactId>moquette-broker</artifactId> - </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 27a5ce3b7b7..d0eef468d4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -1190,14 +1190,14 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { private void prepareExternalServiceResources() throws StartupException { long startTime = System.currentTimeMillis(); - if (resourcesInformationHolder.getExternalServiceEntryList() == null - || resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) { - return; - } try { - ExternalServiceManagementService.getInstance() - .restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList()); + if (resourcesInformationHolder.getExternalServiceEntryList() != null + && !resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) { + ExternalServiceManagementService.getInstance() + .restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList()); + } + ExternalServiceManagementService.getInstance().restoreRunningServiceInstance(); } catch (Exception e) { throw new StartupException(e); @@ -1290,6 +1290,7 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { public void stop() { stopTriggerRelatedServices(); registerManager.deregisterAll(); + ExternalServiceManagementService.getInstance().stopRunningServices(); JMXService.deregisterMBean(mbeanName); MetricService.getInstance().stop(); if (schemaRegionConsensusStarted) { @@ -1310,9 +1311,6 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { } private void initProtocols() throws StartupException { - if (config.isEnableMQTTService()) { - registerManager.register(MQTTService.getInstance()); - } if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) { registerManager.register(RestService.getInstance()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java index 7bed3ffe471..dfe90cb3ca1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java @@ -22,14 +22,10 @@ package org.apache.iotdb.db.service.externalservice; import java.util.function.Supplier; public enum BuiltinExternalServices { - MQTT( - "MQTT", - "org.apache.iotdb.externalservice.Mqtt", - // IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService - () -> false), + MQTT("MQTT", "org.apache.iotdb.mqtt.MQTTService", () -> true), REST( "REST", - "org.apache.iotdb.externalservice.Rest", + "org.apache.iotdb.mqtt.RestService", // IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService () -> false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java index 71f0df341a6..234d81d82c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java @@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.externalservice.ServiceInfo; +import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; @@ -65,10 +66,25 @@ public class ExternalServiceManagementService { private static final Logger LOGGER = LoggerFactory.getLogger(ExternalServiceManagementService.class); + public static final String INSTANCE_NULL_ERROR_MSG = + "External Service instance is null when state is RUNNING!"; + private ExternalServiceManagementService(String libRoot) { this.serviceInfos = new HashMap<>(); restoreBuiltInServices(); this.libRoot = libRoot; + makDir(libRoot); + } + + private static void makDir(String dir) { + try { + SystemFileFactory.INSTANCE.makeDirIfNecessary(dir); + } catch (IOException e) { + LOGGER.error("Failed to make external service dir", e); + throw new ExternalServiceManagementException( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage())); + } } public Iterator<TExternalServiceEntry> showService(int dataNodeId) @@ -170,7 +186,8 @@ public class ExternalServiceManagementService { private IExternalService createExternalServiceInstance(String serviceName, String className) { // close ClassLoader automatically to release the file handle - try (ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); ) { + try { + ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); return (IExternalService) Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance(); } catch (IOException @@ -235,7 +252,7 @@ public class ExternalServiceManagementService { private void stopService(ServiceInfo serviceInfo) { checkState( serviceInfo.getServiceInstance() != null, - "External Service instance is null when state is RUNNING!", + INSTANCE_NULL_ERROR_MSG, serviceInfo.getServiceName()); serviceInfo.getServiceInstance().stop(); } @@ -311,11 +328,34 @@ public class ExternalServiceManagementService { serviceInfo -> { // start services with RUNNING state if (serviceInfo.getState() == RUNNING) { - IExternalService serviceInstance = - createExternalServiceInstance( - serviceInfo.getServiceName(), serviceInfo.getClassName()); - serviceInfo.setServiceInstance(serviceInstance); - serviceInstance.start(); + + try { + IExternalService serviceInstance = + createExternalServiceInstance( + serviceInfo.getServiceName(), serviceInfo.getClassName()); + serviceInfo.setServiceInstance(serviceInstance); + } finally { + // set STOPPED to avoid the case: service is RUNNING, but its instance is null + if (serviceInfo.getServiceInstance() == null) { + serviceInfo.setState(STOPPED); + } + } + + serviceInfo.getServiceInstance().start(); + } + }); + } + + public void stopRunningServices() { + serviceInfos + .values() + .forEach( + serviceInfo -> { + // start services with RUNNING state + if (serviceInfo.getState() == RUNNING) { + IExternalService serviceInstance = serviceInfo.getServiceInstance(); + checkState(serviceInstance != null, INSTANCE_NULL_ERROR_MSG); + serviceInstance.stop(); } }); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java index 7267c79a665..55fad14e5a1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java @@ -25,7 +25,6 @@ public enum ServiceType { METRIC_SERVICE("Metrics ServerService", "MetricService"), RPC_SERVICE("RPC ServerService", "RPCService"), INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"), - MQTT_SERVICE("MQTTService", "MqttService"), AIR_GAP_SERVICE("AirGapService", "AirGapService"), MONITOR_SERVICE("Monitor ServerService", "Monitor"), STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"), diff --git a/pom.xml b/pom.xml index d6261e70af1..4a80bacb188 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,7 @@ <module>distribution</module> <module>example</module> <module>library-udf</module> + <module>external-service-impl</module> </modules> <properties> <!-- This was the last version to support Java 8 -->
