This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch remove_mqtt_server in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1ce6236e1bf22a0a7b81d0a70173845ffba7e4d7 Author: Tian Jiang <[email protected]> AuthorDate: Tue Oct 28 15:37:23 2025 +0800 tmp save --- Code Summary.md | 1 - LICENSE-binary | 1 - dependencies.json | 2 - example/mqtt-customize/README.md | 42 --- example/mqtt-customize/pom.xml | 39 --- .../server/CustomizedJsonPayloadFormatter.java | 74 ----- ....apache.iotdb.db.protocol.mqtt.PayloadFormatter | 20 -- example/mqtt/README.md | 33 -- example/mqtt/pom.xml | 37 --- .../java/org/apache/iotdb/mqtt/MQTTClient.java | 112 ------- example/pom.xml | 2 - integration-test/import-control.xml | 2 - integration-test/pom.xml | 4 - .../iotdb/it/env/cluster/ClusterConstant.java | 3 - .../it/env/cluster/config/MppCommonConfig.java | 12 - .../it/env/cluster/config/MppDataNodeConfig.java | 12 - .../env/cluster/config/MppSharedCommonConfig.java | 14 - .../iotdb/it/env/cluster/env/AbstractEnv.java | 7 - .../it/env/cluster/node/AbstractNodeWrapper.java | 4 - .../iotdb/it/env/cluster/node/DataNodeWrapper.java | 15 +- .../it/env/remote/config/RemoteCommonConfig.java | 10 - .../it/env/remote/config/RemoteDataNodeConfig.java | 10 - .../iotdb/it/env/remote/env/RemoteServerEnv.java | 5 - .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 - .../org/apache/iotdb/itbase/env/CommonConfig.java | 4 - .../apache/iotdb/itbase/env/DataNodeConfig.java | 4 - .../relational/it/mqtt/IoTDBMQTTServiceIT.java | 161 ---------- .../src/test/resources/logback-test.xml | 1 - iotdb-core/datanode/pom.xml | 4 - .../assembly/resources/conf/logback-datanode.xml | 1 - .../apache/iotdb/db/conf/DataNodeStartupCheck.java | 1 - .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 101 ------- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 50 ---- .../db/protocol/mqtt/BrokerAuthenticator.java | 42 --- .../db/protocol/mqtt/JSONPayloadFormatter.java | 148 --------- .../db/protocol/mqtt/LinePayloadFormatter.java | 280 ----------------- .../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 331 --------------------- .../org/apache/iotdb/db/protocol/mqtt/Message.java | 33 -- .../db/protocol/mqtt/PayloadFormatManager.java | 134 --------- .../iotdb/db/protocol/mqtt/PayloadFormatter.java | 65 ---- .../iotdb/db/protocol/mqtt/TableMessage.java | 144 --------- .../apache/iotdb/db/protocol/mqtt/TreeMessage.java | 77 ----- .../iotdb/db/protocol/session/IClientSession.java | 3 +- .../db/protocol/session/MqttClientSession.java | 79 ----- .../iotdb/db/protocol/session/SessionManager.java | 15 - .../java/org/apache/iotdb/db/service/DataNode.java | 3 - .../org/apache/iotdb/db/service/MQTTService.java | 125 -------- ....apache.iotdb.db.protocol.mqtt.PayloadFormatter | 3 - .../db/protocol/mqtt/BrokerAuthenticatorTest.java | 51 ---- .../db/protocol/mqtt/JSONPayloadFormatterTest.java | 133 --------- .../db/protocol/mqtt/LinePayloadFormatterTest.java | 94 ------ .../db/protocol/mqtt/PayloadFormatManagerTest.java | 42 --- .../apache/iotdb/db/utils/EnvironmentUtils.java | 5 - .../src/test/resources/datanode1conf/logback.xml | 1 - .../src/test/resources/datanode2conf/logback.xml | 1 - .../src/test/resources/datanode3conf/logback.xml | 1 - .../datanode/src/test/resources/logback-test.xml | 2 - .../conf/iotdb-system.properties.template | 36 --- .../apache/iotdb/commons/conf/IoTDBConstant.java | 10 - .../apache/iotdb/commons/service/ServiceType.java | 1 - .../thrift-datanode/src/main/thrift/client.thrift | 2 +- pom.xml | 6 - 62 files changed, 3 insertions(+), 2654 deletions(-) diff --git a/Code Summary.md b/Code Summary.md index 65fd1ddbc06..56b9b7a5199 100644 --- a/Code Summary.md +++ b/Code Summary.md @@ -49,7 +49,6 @@ Next, it performs following activities and registers the essential services: * **StorageEngine** * **RPCService** * **MetricsService** - * **MQTTService** * **SyncServerManager** * **UpgradeSevice** * **MergeManager** diff --git a/LICENSE-binary b/LICENSE-binary index aaa7a93867f..9f3a1db3f9f 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -247,7 +247,6 @@ com.github.moquette-io.moquette:moquette-broker:0.18 io.netty:netty-buffer:4.1.126.Final io.netty:netty-codec:4.1.126.Final io.netty:netty-codec-http:4.1.126.Final -io.netty:netty-codec-mqtt:4.1.126.Final io.netty:netty-common:4.1.126.Final io.netty:netty-handler:4.1.126.Final io.netty:netty-resolver:4.1.126.Final diff --git a/dependencies.json b/dependencies.json index 40421228542..f6513e8b4ed 100644 --- a/dependencies.json +++ b/dependencies.json @@ -56,7 +56,6 @@ "io.netty:netty-codec-dns", "io.netty:netty-codec-http", "io.netty:netty-codec-http2", - "io.netty:netty-codec-mqtt", "io.netty:netty-codec-socks", "io.netty:netty-common", "io.netty:netty-handler", @@ -134,7 +133,6 @@ "org.fusesource.hawtbuf:hawtbuf", "org.fusesource.hawtdispatch:hawtdispatch", "org.fusesource.hawtdispatch:hawtdispatch-transport", - "org.fusesource.mqtt-client:mqtt-client", "org.glassfish.hk2:hk2-api", "org.glassfish.hk2:hk2-locator", "org.glassfish.hk2:hk2-utils", diff --git a/example/mqtt-customize/README.md b/example/mqtt-customize/README.md deleted file mode 100644 index 9fa224c4dab..00000000000 --- a/example/mqtt-customize/README.md +++ /dev/null @@ -1,42 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -# Customized IoTDB-MQTT-Broker Example - -## Function -``` -The example is to show how to customize your MQTT message format -``` - -## Usage - -* Define your implementation which implements `PayloadFormatter.java` -* modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`: - clean the file and put your implementation class name into the file -* compile your implementation as a jar file - - -Then, in your server: -* Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. -* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in iotdb-datanode.properties) -* Set the value of `mqtt_payload_formatter` in `conf/iotdb-datanode.properties` as the value of getName() in your implementation -* Launch the IoTDB server. -* Now IoTDB will use your implementation to parse the MQTT message. - diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml deleted file mode 100644 index 9a530f49fd9..00000000000 --- a/example/mqtt-customize/pom.xml +++ /dev/null @@ -1,39 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-examples</artifactId> - <version>2.0.6-SNAPSHOT</version> - </parent> - <artifactId>customize-mqtt-example</artifactId> - <name>IoTDB: Example: Customized MQTT</name> - <dependencies> - <!-- used by the server--> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-server</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> -</project> diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java deleted file mode 100644 index 8c3a9621739..00000000000 --- a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.mqtt.server; - -import org.apache.iotdb.db.protocol.mqtt.Message; -import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; -import org.apache.iotdb.db.protocol.mqtt.TreeMessage; - -import io.netty.buffer.ByteBuf; -import org.apache.tsfile.external.commons.lang3.NotImplementedException; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -public class CustomizedJsonPayloadFormatter implements PayloadFormatter { - - @Override - public List<Message> format(String topic, ByteBuf payload) { - // Suppose the payload is a json format - if (payload == null) { - return Collections.emptyList(); - } - - // parse data from the json and generate Messages and put them into List<Message> ret - List<Message> ret = new ArrayList<>(); - // this is just an example, so we just generate some Messages directly - for (int i = 0; i < 2; i++) { - long ts = i; - TreeMessage message = new TreeMessage(); - message.setDevice("d" + i); - message.setTimestamp(ts); - message.setMeasurements(Arrays.asList("s1", "s2")); - message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); - ret.add(message); - } - return ret; - } - - @Override - @Deprecated - public List<Message> format(ByteBuf payload) { - throw new NotImplementedException(); - } - - @Override - public String getName() { - // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: - return "CustomizedJson"; - } - - @Override - public String getType() { - return PayloadFormatter.TREE_TYPE; - } -} diff --git a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter deleted file mode 100644 index b0b824ba784..00000000000 --- a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter \ No newline at end of file diff --git a/example/mqtt/README.md b/example/mqtt/README.md deleted file mode 100644 index 421dcd8b726..00000000000 --- a/example/mqtt/README.md +++ /dev/null @@ -1,33 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -# IoTDB-MQTT-Broker Example - -## Function -``` -The example is to show how to send data to IoTDB from a mqtt client. -``` - -## Usage - -* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in iotdb-datanode.properties) -* Launch the IoTDB server. -* Setup database `CREATE DATABASE root.sg` and create time timeseries `CREATE TIMESERIES root.sg.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN`. -* Run `org.apache.iotdb.mqtt.MQTTClient` to run the mqtt client and send events to server. diff --git a/example/mqtt/pom.xml b/example/mqtt/pom.xml deleted file mode 100644 index b3e7e0c238d..00000000000 --- a/example/mqtt/pom.xml +++ /dev/null @@ -1,37 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-examples</artifactId> - <version>2.0.6-SNAPSHOT</version> - </parent> - <artifactId>mqtt-example</artifactId> - <name>IoTDB: Example: MQTT</name> - <dependencies> - <dependency> - <groupId>org.fusesource.mqtt-client</groupId> - <artifactId>mqtt-client</artifactId> - </dependency> - </dependencies> -</project> diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java deleted file mode 100644 index ec15ad23567..00000000000 --- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.mqtt; - -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.QoS; - -import java.util.Random; - -public class MQTTClient { - - private static final String DATABASE = "myMqttTest"; - - public static void main(String[] args) throws Exception { - MQTT mqtt = new MQTT(); - mqtt.setHost("127.0.0.1", 1883); - mqtt.setUserName("root"); - mqtt.setPassword("root"); - mqtt.setConnectAttemptsMax(3); - mqtt.setReconnectDelay(10); - - BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - // the config mqttPayloadFormatter must be tree-json - // jsonPayloadFormatter(connection); - // the config mqttPayloadFormatter must be table-line - linePayloadFormatter(connection); - connection.disconnect(); - } - - private static void jsonPayloadFormatter(BlockingConnection connection) throws Exception { - Random random = new Random(); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 10; i++) { - String payload = - String.format( - "{\n" - + "\"device\":\"root.sg.d1\",\n" - + "\"timestamp\":%d,\n" - + "\"measurements\":[\"s1\"],\n" - + "\"values\":[%f]\n" - + "}", - System.currentTimeMillis(), random.nextDouble()); - sb.append(payload).append(","); - - // publish a json object - Thread.sleep(1); - connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - } - // publish a json array - sb.insert(0, "["); - sb.replace(sb.lastIndexOf(","), sb.length(), "]"); - connection.publish("root.sg.d1.s1", sb.toString().getBytes(), QoS.AT_LEAST_ONCE, false); - } - - // The database must be created in advance - private static void linePayloadFormatter(BlockingConnection connection) throws Exception { - // myTable,tag1=t1,tag2=t2 fieldKey1="1,2,3" 1740109006001 - String payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006001"; - connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - - payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006002"; - connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - - payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006003"; - connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - payload = - "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; - connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - - payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; - connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - - payload = - "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " - + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; - connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - - payload = - "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " - + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; - connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - - payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; - connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); - Thread.sleep(10); - } -} diff --git a/example/pom.xml b/example/pom.xml index cce52b920c4..80ba0e423ca 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -31,8 +31,6 @@ <name>IoTDB: Examples</name> <modules> <module>jdbc</module> - <module>mqtt</module> - <module>mqtt-customize</module> <module>pipe-count-point-processor</module> <module>pipe-opc-ua-sink</module> <module>rest-java-example</module> diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml index 08cc61008c6..2f427c3954c 100644 --- a/integration-test/import-control.xml +++ b/integration-test/import-control.xml @@ -39,7 +39,6 @@ <allow class="org.apache.iotdb.db.utils.constant.TestConstant"/> <allow class="org.apache.iotdb.db.utils.MathUtils"/> <allow class="org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode"/> - <allow class="org.fusesource.mqtt.client.QoS"/> <allow class="org.apache.iotdb.commons.path.PartialPath"/> <allow pkg="java.text"/> <allow pkg="org.apache.iotdb.db.it.utils"/> @@ -53,7 +52,6 @@ <allow pkg="org\.apache\.iotdb\.tsfile\.read.*" regex="true"/> <allow pkg="org\.apache\.iotdb\.tsfile\.utils.*" regex="true"/> <allow pkg="org\.apache\.iotdb\.tsfile\.write.*" regex="true"/> - <allow pkg="org\.apache\.iotdb\.db\.engine\.trigger\.sink\.mqtt.*" regex="true"/> </subpackage> <subpackage name="confignode.it"> <allow class="java.nio.ByteBuffer"/> diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 203f51415dc..38690def05b 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -213,10 +213,6 @@ <artifactId>jcip-annotations</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.fusesource.mqtt-client</groupId> - <artifactId>mqtt-client</artifactId> - </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index 82ed7976959..3c5be59d16a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -162,9 +162,6 @@ public class ClusterConstant { public static final String SCHEMA_REPLICATION_FACTOR = "schema_replication_factor"; public static final String DATA_REPLICATION_FACTOR = "data_replication_factor"; - public static final String MQTT_HOST = "mqtt_host"; - public static final String MQTT_PORT = "mqtt_port"; - public static final String MQTT_DATA_PATH = "mqtt_data_path"; public static final String UDF_LIB_DIR = "udf_lib_dir"; public static final String TRIGGER_LIB_DIR = "trigger_lib_dir"; public static final String PIPE_LIB_DIR = "pipe_lib_dir"; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 1df43c16206..ce99f8010e1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -309,18 +309,6 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } - @Override - public CommonConfig setEnableMQTTService(boolean enableMQTTService) { - setProperty("enable_mqtt_service", String.valueOf(enableMQTTService)); - return this; - } - - @Override - public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { - setProperty("mqtt_payload_formatter", String.valueOf(mqttPayloadFormatter)); - return this; - } - @Override public CommonConfig setSchemaEngineMode(String schemaEngineMode) { setProperty("schema_engine_mode", schemaEngineMode); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 357c15c7856..a99d00bd732 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -96,18 +96,6 @@ public class MppDataNodeConfig extends MppBaseConfig implements DataNodeConfig { return this; } - @Override - public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) { - setProperty("enable_mqtt_service", String.valueOf(enableMQTTService)); - return this; - } - - @Override - public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { - setProperty("mqtt_payload_formatter", String.valueOf(mqttPayloadFormatter)); - return this; - } - @Override public DataNodeConfig setLoadLastCacheStrategy(String strategyName) { setProperty("last_cache_operation_on_load", strategyName); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 504b1ae60e2..7c7ac1b370c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -309,20 +309,6 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } - @Override - public CommonConfig setEnableMQTTService(boolean enableMQTTService) { - cnConfig.setEnableMQTTService(enableMQTTService); - dnConfig.setEnableMQTTService(enableMQTTService); - return this; - } - - @Override - public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { - cnConfig.setMqttPayloadFormatter(mqttPayloadFormatter); - dnConfig.setMqttPayloadFormatter(mqttPayloadFormatter); - return this; - } - @Override public CommonConfig setSchemaEngineMode(String schemaEngineMode) { cnConfig.setSchemaEngineMode(schemaEngineMode); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 8308225a9af..12ec3d48fb5 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -1456,13 +1456,6 @@ public abstract class AbstractEnv implements BaseEnv { throw new IllegalStateException(lastException); } - @Override - public int getMqttPort() { - return dataNodeWrapperList - .get(new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size())) - .getMqttPort(); - } - @Override public String getIP() { return dataNodeWrapperList.get(0).getIp(); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 1a349034469..2d866daf66b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -88,8 +88,6 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STAND import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_DATA_REGION_REPLICA_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_SCHEMA_REGION_CONSENSUS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_SCHEMA_REGION_REPLICA_NUM; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT; import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_BATCH_MODE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_BATCH_MODE_CONFIG_NODE_CONSENSUS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_BATCH_MODE_DATA_REGION_CONSENSUS; @@ -188,8 +186,6 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper { immutableCommonProperties.setProperty(UDF_LIB_DIR, MppBaseConfig.NULL_VALUE); immutableCommonProperties.setProperty(TRIGGER_LIB_DIR, MppBaseConfig.NULL_VALUE); immutableCommonProperties.setProperty(PIPE_LIB_DIR, MppBaseConfig.NULL_VALUE); - immutableCommonProperties.setProperty(MQTT_HOST, MppBaseConfig.NULL_VALUE); - immutableCommonProperties.setProperty(MQTT_PORT, MppBaseConfig.NULL_VALUE); immutableCommonProperties.setProperty(REST_SERVICE_PORT, MppBaseConfig.NULL_VALUE); immutableCommonProperties.setProperty(INFLUXDB_RPC_PORT, MppBaseConfig.NULL_VALUE); this.jvmConfig = initVMConfig(); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index f08c085bc6c..24a5d01ae58 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -58,9 +58,6 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.DN_WAL_DIRS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.IOTDB_SYSTEM_PROPERTIES_FILE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.MAIN_CLASS_NAME; import static org.apache.iotdb.it.env.cluster.ClusterConstant.MAX_TSBLOCK_SIZE_IN_BYTES; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_DATA_PATH; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT; import static org.apache.iotdb.it.env.cluster.ClusterConstant.PAGE_SIZE_IN_BYTE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_AIR_GAP_RECEIVER_PORT; import static org.apache.iotdb.it.env.cluster.ClusterConstant.REST_SERVICE_PORT; @@ -77,7 +74,6 @@ public class DataNodeWrapper extends AbstractNodeWrapper { private final String internalAddress; private final int dataRegionConsensusPort; private final int schemaRegionConsensusPort; - private final int mqttPort; private final int restServicePort; private final int pipeAirGapReceiverPort; @@ -101,7 +97,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper { this.internalPort = portList[2]; this.dataRegionConsensusPort = portList[3]; this.schemaRegionConsensusPort = portList[4]; - this.mqttPort = portList[5]; + // deprecated: this.mqttPort = portList[5]; this.pipeAirGapReceiverPort = portList[6]; this.restServicePort = portList[10] + 6000; this.defaultNodePropertiesFile = @@ -113,11 +109,6 @@ public class DataNodeWrapper extends AbstractNodeWrapper { reloadMutableFields(); // Initialize immutable properties - // Override mqtt properties of super class - immutableCommonProperties.setProperty(MQTT_HOST, super.getIp()); - immutableCommonProperties.setProperty(MQTT_PORT, String.valueOf(this.mqttPort)); - immutableCommonProperties.setProperty( - MQTT_DATA_PATH, getNodePath() + File.separator + "mqttData"); immutableCommonProperties.setProperty( PIPE_AIR_GAP_RECEIVER_PORT, String.valueOf(this.pipeAirGapReceiverPort)); @@ -292,10 +283,6 @@ public class DataNodeWrapper extends AbstractNodeWrapper { return schemaRegionConsensusPort; } - public int getMqttPort() { - return mqttPort; - } - public int getPipeAirGapReceiverPort() { return pipeAirGapReceiverPort; } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index fdcff20dbc8..c33c4662557 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -216,16 +216,6 @@ public class RemoteCommonConfig implements CommonConfig { return this; } - @Override - public CommonConfig setEnableMQTTService(boolean enableMQTTService) { - return this; - } - - @Override - public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { - return this; - } - @Override public CommonConfig setSchemaEngineMode(String schemaEngineMode) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index 1af7cb8f613..af7f38cef7a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -59,16 +59,6 @@ public class RemoteDataNodeConfig implements DataNodeConfig { return this; } - @Override - public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) { - return this; - } - - @Override - public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { - return this; - } - @Override public DataNodeConfig setLoadLastCacheStrategy(String strategyName) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index 586eff60494..6e47dbc4c09 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -475,11 +475,6 @@ public class RemoteServerEnv implements BaseEnv { throw new UnsupportedOperationException(); } - @Override - public int getMqttPort() { - throw new UnsupportedOperationException(); - } - @Override public String getIP() { return ip_addr; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 8b32deb3ea8..5744e50a523 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -330,8 +330,6 @@ public interface BaseEnv { /** Shutdown forcibly all existed DataNodes. */ void shutdownForciblyAllDataNodes(); - int getMqttPort(); - String getIP(); String getPort(); diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 9767e1c089e..3fd5db3988a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -100,10 +100,6 @@ public interface CommonConfig { CommonConfig setMaxDegreeOfIndexNode(int maxDegreeOfIndexNode); - CommonConfig setEnableMQTTService(boolean enableMQTTService); - - CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter); - CommonConfig setSchemaEngineMode(String schemaEngineMode); CommonConfig setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit); diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index 0ae46ffc70f..6259f4e4bdc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -38,10 +38,6 @@ public interface DataNodeConfig { DataNodeConfig setCompactionScheduleInterval(long compactionScheduleInterval); - DataNodeConfig setEnableMQTTService(boolean enableMQTTService); - - DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter); - DataNodeConfig setLoadLastCacheStrategy(String strategyName); DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java deleted file mode 100644 index afc202ae68d..00000000000 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.relational.it.mqtt; - -import org.apache.iotdb.isession.ITableSession; -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.it.env.EnvFactory; -import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; -import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.TableClusterIT; -import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; -import org.apache.iotdb.itbase.env.BaseEnv; -import org.apache.iotdb.rpc.StatementExecutionException; - -import org.apache.tsfile.read.common.Field; -import org.awaitility.Awaitility; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.QoS; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -@RunWith(IoTDBTestRunner.class) -@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) -public class IoTDBMQTTServiceIT { - private BlockingConnection connection; - private static final String IP = System.getProperty("RemoteIp", "127.0.0.1"); - private static final String USER = System.getProperty("RemoteUser", "root"); - private static final String PASSWORD = System.getProperty("RemotePassword", "root"); - private static final String DATABASE = "mqtttest"; - public static final String FORMATTER = "line"; - - @Before - public void setUp() throws Exception { - BaseEnv baseEnv = EnvFactory.getEnv(); - baseEnv.getConfig().getDataNodeConfig().setEnableMQTTService(true); - baseEnv.getConfig().getDataNodeConfig().setMqttPayloadFormatter(FORMATTER); - baseEnv.initClusterEnvironment(); - DataNodeWrapper portConflictDataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0); - int port = portConflictDataNodeWrapper.getMqttPort(); - MQTT mqtt = new MQTT(); - mqtt.setHost(IP, port); - mqtt.setUserName(USER); - mqtt.setPassword(PASSWORD); - mqtt.setConnectAttemptsMax(3); - mqtt.setReconnectDelay(10); - - connection = mqtt.blockingConnection(); - connection.connect(); - } - - @After - public void tearDown() throws Exception { - try { - if (connection != null) { - connection.disconnect(); - } - } catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - EnvFactory.getEnv().cleanClusterEnvironment(); - } - - @Test - public void testNoAttr() throws Exception { - try (final ITableSession session = - EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) { - session.executeNonQueryStatement("CREATE DATABASE " + DATABASE); - String payload1 = "test1,tag1=t1,tag2=t2 field1=1,field2=1f,field3=1i32 1"; - Awaitility.await() - .atMost(3, TimeUnit.MINUTES) - .pollInterval(1, TimeUnit.SECONDS) - .until( - () -> { - connection.publish( - DATABASE + "/myTopic", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - try (final SessionDataSet dataSet = - session.executeQueryStatement( - "select tag1,tag2,field1,field2,field3 from test1 where time = 1")) { - assertEquals(5, dataSet.getColumnNames().size()); - List<Field> fields = dataSet.next().getFields(); - assertEquals("t1", fields.get(0).getStringValue()); - assertEquals("t2", fields.get(1).getStringValue()); - assertEquals(1d, fields.get(2).getDoubleV(), 0); - assertEquals(1f, fields.get(3).getFloatV(), 0); - assertEquals(1, fields.get(4).getIntV(), 0); - return true; - } catch (StatementExecutionException e) { - if (e.getMessage() != null && e.getMessage().contains("does not exist")) { - return false; - } else { - throw e; - } - } - }); - } - } - - @Test - public void testWithAttr() throws Exception { - try (final ITableSession session = - EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) { - session.executeNonQueryStatement("CREATE DATABASE " + DATABASE); - String payload1 = "test2,tag1=t1,tag2=t2 attr3=a3,attr4=a4 field1=1,field2=1f,field3=1i32 1"; - Awaitility.await() - .atMost(3, TimeUnit.MINUTES) - .pollInterval(1, TimeUnit.SECONDS) - .until( - () -> { - connection.publish( - DATABASE + "/myTopic", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - try (final SessionDataSet dataSet = - session.executeQueryStatement( - "select tag1,tag2,attr3,attr4,field1,field2,field3 from test2 where time = 1")) { - assertEquals(7, dataSet.getColumnNames().size()); - List<Field> fields = dataSet.next().getFields(); - assertEquals("t1", fields.get(0).getStringValue()); - assertEquals("t2", fields.get(1).getStringValue()); - assertEquals("a3", fields.get(2).getStringValue()); - assertEquals("a4", fields.get(3).getStringValue()); - assertEquals(1d, fields.get(4).getDoubleV(), 0); - assertEquals(1f, fields.get(5).getFloatV(), 0); - assertEquals(1, fields.get(6).getIntV(), 0); - return true; - } catch (StatementExecutionException e) { - if (e.getMessage() != null && e.getMessage().contains("does not exist")) { - return false; - } else { - throw e; - } - } - }); - } - } -} diff --git a/integration-test/src/test/resources/logback-test.xml b/integration-test/src/test/resources/logback-test.xml index ba595ff9752..9b4f3af3e4b 100644 --- a/integration-test/src/test/resources/logback-test.xml +++ b/integration-test/src/test/resources/logback-test.xml @@ -50,7 +50,6 @@ <logger name="org.apache.iotdb.commons.service.RegisterManager" level="INFO"/> <logger name="org.apache.iotdb.db.service.DataNode" level="WARN"/> <logger name="org.apache.iotdb.db.service.ExternalRPCService" level="INFO"/> - <logger name="org.apache.iotdb.db.service.MQTTService" level="INFO"/> <logger name="org.apache.iotdb.db.conf.IoTDBDescriptor" level="WARN"/> <logger name="org.apache.tsfile.common.conf.TSFileDescriptor" level="WARN"/> <logger name="DETAILED_FAILURE_QUERY_TRACE" level="ERROR"/> diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 626d8265eed..c8ad7d48177 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -195,10 +195,6 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-codec-mqtt</artifactId> - </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> diff --git a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml index 557fd8b160f..299b5e114d9 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml +++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml @@ -245,7 +245,6 @@ <appender-ref ref="FILEALL"/> <appender-ref ref="stdout"/> </root> - <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/> <logger level="info" name="org.apache.iotdb.db.service"/> <logger level="info" name="org.apache.iotdb.db.conf"/> <logger level="info" name="org.apache.iotdb.db.cost.statistic"> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java index 7291469f676..1368e4d3b39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeStartupCheck.java @@ -45,7 +45,6 @@ public class DataNodeStartupCheck extends StartupChecks { private void checkDataNodePortUnique() throws StartupException { Set<Integer> portSet = new HashSet<>(); portSet.add(config.getInternalPort()); - portSet.add(config.getMqttPort()); portSet.add(config.getRpcPort()); portSet.add(config.getMppDataExchangePort()); portSet.add(config.getDataRegionConsensusPort()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 7db1b0f0225..25d4f0d2a12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -103,27 +103,6 @@ public class IoTDBConfig { public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER); - /** Whether to enable the mqtt service. */ - private boolean enableMQTTService = false; - - /** The mqtt service binding host. */ - private String mqttHost = "127.0.0.1"; - - /** The mqtt service binding port. */ - private int mqttPort = 1883; - - /** The handler pool size for handing the mqtt messages. */ - private int mqttHandlerPoolSize = Math.max(1, Runtime.getRuntime().availableProcessors() >> 1); - - /** The mqtt message payload formatter. */ - private String mqttPayloadFormatter = "json"; - - /** The mqtt save data path */ - private String mqttDataPath = "data/"; - - /** Max mqtt message size. Unit: byte */ - private int mqttMaxMessageSize = 1048576; - /** Rpc binding address. */ private String rpcAddress = "0.0.0.0"; @@ -283,10 +262,6 @@ public class IoTDBConfig { private int pipeTaskThreadCount = 5; - /** External lib directory for MQTT, stores user-uploaded JAR files */ - private String mqttDir = - IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.MQTT_FOLDER_NAME; - /** Tiered data directories. It can be settled as dataDirs = {{"data1"}, {"data2", "data3"}}; */ private String[][] tierDataDirs = { {IoTDBConstant.DN_DEFAULT_DATA_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME} @@ -965,9 +940,6 @@ public class IoTDBConfig { /** Trigger HTTP forward pool max connection for per route */ private int triggerForwardHTTPPOOLMaxPerRoute = 20; - /** Trigger MQTT forward pool size */ - private int triggerForwardMQTTPoolSize = 4; - /** How many times will we retry to find an instance of stateful trigger */ private int retryNumToFindStatefulTrigger = 3; @@ -1343,7 +1315,6 @@ public class IoTDBConfig { iotConsensusV2ReceiverFileDirs[i] = addDataHomeDir(iotConsensusV2ReceiverFileDirs[i]); } iotConsensusV2DeletionFileDir = addDataHomeDir(iotConsensusV2DeletionFileDir); - mqttDir = addDataHomeDir(mqttDir); extPipeDir = addDataHomeDir(extPipeDir); queryDir = addDataHomeDir(queryDir); sortTmpDir = addDataHomeDir(sortTmpDir); @@ -1672,14 +1643,6 @@ public class IoTDBConfig { this.pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME; } - public String getMqttDir() { - return mqttDir; - } - - public void setMqttDir(String mqttDir) { - this.mqttDir = mqttDir; - } - public String getMultiDirStrategyClassName() { return multiDirStrategyClassName; } @@ -2488,62 +2451,6 @@ public class IoTDBConfig { this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService; } - public boolean isEnableMQTTService() { - return enableMQTTService; - } - - public void setEnableMQTTService(boolean enableMQTTService) { - this.enableMQTTService = enableMQTTService; - } - - public String getMqttHost() { - return mqttHost; - } - - public void setMqttHost(String mqttHost) { - this.mqttHost = mqttHost; - } - - public int getMqttPort() { - return mqttPort; - } - - public void setMqttPort(int mqttPort) { - this.mqttPort = mqttPort; - } - - public int getMqttHandlerPoolSize() { - return mqttHandlerPoolSize; - } - - public void setMqttHandlerPoolSize(int mqttHandlerPoolSize) { - this.mqttHandlerPoolSize = mqttHandlerPoolSize; - } - - public String getMqttPayloadFormatter() { - return mqttPayloadFormatter; - } - - public void setMqttPayloadFormatter(String mqttPayloadFormatter) { - this.mqttPayloadFormatter = mqttPayloadFormatter; - } - - public String getMqttDataPath() { - return mqttDataPath; - } - - public void setMqttDataPath(String mqttDataPath) { - this.mqttDataPath = mqttDataPath; - } - - public int getMqttMaxMessageSize() { - return mqttMaxMessageSize; - } - - public void setMqttMaxMessageSize(int mqttMaxMessageSize) { - this.mqttMaxMessageSize = mqttMaxMessageSize; - } - public int getTagAttributeFlushInterval() { return tagAttributeFlushInterval; } @@ -3225,14 +3132,6 @@ public class IoTDBConfig { this.triggerForwardHTTPPOOLMaxPerRoute = triggerForwardHTTPPOOLMaxPerRoute; } - public int getTriggerForwardMQTTPoolSize() { - return triggerForwardMQTTPoolSize; - } - - public void setTriggerForwardMQTTPoolSize(int triggerForwardMQTTPoolSize) { - this.triggerForwardMQTTPoolSize = triggerForwardMQTTPoolSize; - } - public int getRetryNumToFindStatefulTrigger() { return retryNumToFindStatefulTrigger; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 59b59e130c5..f67894c706f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -876,9 +876,6 @@ public class IoTDBDescriptor { "max_measurement_num_of_internal_request", String.valueOf(conf.getMaxMeasurementNumOfInternalRequest())))); - // mqtt - loadMqttProps(properties); - conf.setIntoOperationBufferSizeInByte( Long.parseLong( properties.getProperty( @@ -1873,48 +1870,6 @@ public class IoTDBDescriptor { } } - // Mqtt related - private void loadMqttProps(TrimProperties properties) { - conf.setMqttDir(properties.getProperty("mqtt_root_dir", conf.getMqttDir())); - - if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) { - conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME).trim()); - } else { - LOGGER.info("MQTT host is not configured, will use dn_rpc_address."); - conf.setMqttHost(properties.getProperty(IoTDBConstant.DN_RPC_ADDRESS, conf.getRpcAddress())); - } - - if (properties.getProperty(IoTDBConstant.MQTT_PORT_NAME) != null) { - conf.setMqttPort( - Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_PORT_NAME).trim())); - } - - if (properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME) != null) { - conf.setMqttHandlerPoolSize( - Integer.parseInt( - properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME).trim())); - } - - if (properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME) != null) { - conf.setMqttPayloadFormatter( - properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME).trim()); - } - - if (properties.getProperty(IoTDBConstant.MQTT_DATA_PATH) != null) { - conf.setMqttDataPath(properties.getProperty(IoTDBConstant.MQTT_DATA_PATH).trim()); - } - - if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) { - conf.setEnableMQTTService( - Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT).trim())); - } - - if (properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE) != null) { - conf.setMqttMaxMessageSize( - Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_MAX_MESSAGE_SIZE).trim())); - } - } - // timed flush memtable private void loadTimedService(TrimProperties properties) throws IOException { conf.setEnableTimedFlushSeqMemtable( @@ -2542,11 +2497,6 @@ public class IoTDBDescriptor { properties.getProperty( "trigger_forward_http_pool_max_per_route", Integer.toString(conf.getTriggerForwardHTTPPOOLMaxPerRoute())))); - conf.setTriggerForwardMQTTPoolSize( - Integer.parseInt( - properties.getProperty( - "trigger_forward_mqtt_pool_size", - Integer.toString(conf.getTriggerForwardMQTTPoolSize())))); } private void loadPipeProps(TrimProperties properties) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java deleted file mode 100644 index a05c8264b82..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.rpc.TSStatusCode; - -import io.moquette.broker.security.IAuthenticator; -import org.apache.tsfile.external.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** The MQTT broker authenticator. */ -public class BrokerAuthenticator implements IAuthenticator { - private static final Logger LOG = LoggerFactory.getLogger(BrokerAuthenticator.class); - - @Override - public boolean checkValid(String clientId, String username, byte[] password) { - if (StringUtils.isBlank(username) || password == null) { - return false; - } - - return (AuthorityChecker.checkUser(username, new String(password)).getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java deleted file mode 100644 index cc857b7295c..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.mqtt; - -import com.google.common.collect.Lists; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.reflect.TypeToken; -import io.netty.buffer.ByteBuf; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.external.commons.lang3.NotImplementedException; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -/** - * The JSON payload formatter. two json format supported: { "device":"root.sg.d1", - * "timestamp":1586076045524, "measurements":["s1","s2"], "values":[0.530635,0.530635] } - * - * <p>{ "device":"root.sg.d1", "timestamps":[1586076045524,1586076065526], - * "measurements":["s1","s2"], "values":[[0.530635,0.530635], [0.530655,0.530695]] } - */ -public class JSONPayloadFormatter implements PayloadFormatter { - private static final String JSON_KEY_DEVICE = "device"; - private static final String JSON_KEY_TIMESTAMP = "timestamp"; - private static final String JSON_KEY_TIMESTAMPS = "timestamps"; - private static final String JSON_KEY_MEASUREMENTS = "measurements"; - private static final String JSON_KEY_VALUES = "values"; - private static final String JSON_KEY_DATATYPE = "datatypes"; - private static final Gson GSON = new GsonBuilder().create(); - - @Override - public List<Message> format(String topic, ByteBuf payload) { - if (payload == null) { - return new ArrayList<>(); - } - String txt = payload.toString(StandardCharsets.UTF_8); - JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); - if (jsonElement.isJsonObject()) { - JsonObject jsonObject = jsonElement.getAsJsonObject(); - if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) { - return formatJson(jsonObject); - } - if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) { - return formatBatchJson(jsonObject); - } - } else if (jsonElement.isJsonArray()) { - JsonArray jsonArray = jsonElement.getAsJsonArray(); - List<Message> messages = new ArrayList<>(); - for (JsonElement element : jsonArray) { - JsonObject jsonObject = element.getAsJsonObject(); - if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) { - messages.addAll(formatJson(jsonObject)); - } - if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) { - messages.addAll(formatBatchJson(jsonObject)); - } - } - return messages; - } - throw new JsonParseException("payload is invalidate"); - } - - @Override - @Deprecated - public List<Message> format(ByteBuf payload) { - throw new NotImplementedException(); - } - - private List<Message> formatJson(JsonObject jsonObject) { - TreeMessage message = new TreeMessage(); - message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString()); - message.setTimestamp(jsonObject.get(JSON_KEY_TIMESTAMP).getAsLong()); - message.setMeasurements( - GSON.fromJson( - jsonObject.get(JSON_KEY_MEASUREMENTS), new TypeToken<List<String>>() {}.getType())); - message.setValues( - GSON.fromJson(jsonObject.get(JSON_KEY_VALUES), new TypeToken<List<String>>() {}.getType())); - if (jsonObject.has(JSON_KEY_DATATYPE)) { - message.setDataTypes( - GSON.fromJson( - jsonObject.get(JSON_KEY_DATATYPE), new TypeToken<List<TSDataType>>() {}.getType())); - } - return Lists.newArrayList(message); - } - - private List<Message> formatBatchJson(JsonObject jsonObject) { - String device = jsonObject.get(JSON_KEY_DEVICE).getAsString(); - List<String> measurements = - GSON.fromJson( - jsonObject.getAsJsonArray(JSON_KEY_MEASUREMENTS), - new TypeToken<List<String>>() {}.getType()); - List<Long> timestamps = - GSON.fromJson( - jsonObject.get(JSON_KEY_TIMESTAMPS), new TypeToken<List<Long>>() {}.getType()); - List<List<String>> values = - GSON.fromJson( - jsonObject.get(JSON_KEY_VALUES), new TypeToken<List<List<String>>>() {}.getType()); - List<TSDataType> types = - jsonObject.has(JSON_KEY_DATATYPE) - ? GSON.fromJson( - jsonObject.get(JSON_KEY_DATATYPE), new TypeToken<List<TSDataType>>() {}.getType()) - : null; - - List<Message> ret = new ArrayList<>(timestamps.size()); - for (int i = 0; i < timestamps.size(); i++) { - TreeMessage message = new TreeMessage(); - message.setDevice(device); - message.setTimestamp(timestamps.get(i)); - message.setMeasurements(measurements); - message.setValues(values.get(i)); - message.setDataTypes(types); - ret.add(message); - } - return ret; - } - - @Override - public String getName() { - return "json"; - } - - @Override - public String getType() { - return PayloadFormatter.TREE_TYPE; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java deleted file mode 100644 index 8b596ee2c89..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.mqtt; - -import io.netty.buffer.ByteBuf; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.external.commons.lang3.NotImplementedException; -import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Stream; - -/** - * The Line payload formatter. myTable,tag1=value1,tag2=value2 attr1=value1,attr2=value2 - * fieldKey="fieldValue" 1740109006000 \n myTable,tag1=value1,tag2=value2 fieldKey="fieldValue" - * 1740109006000 - */ -public class LinePayloadFormatter implements PayloadFormatter { - - private static final Logger log = LoggerFactory.getLogger(LinePayloadFormatter.class); - - /* - Regular expression matching line protocol ,the attributes field is not required - */ - private static final String REGEX = - "(?<table>\\w+)(,(?<tags>[^ ]+))?(\\s+(?<attributes>[^ ]+))?\\s+(?<fields>[^ ]+)\\s+(?<timestamp>\\d+)"; - private static final String COMMA = ","; - private static final String WELL = "#"; - private static final String LINE_BREAK = "\n"; - private static final String EQUAL = "="; - private static final String TABLE = "table"; - private static final String TAGS = "tags"; - private static final String ATTRIBUTES = "attributes"; - private static final String FIELDS = "fields"; - private static final String TIMESTAMP = "timestamp"; - private static final String NULL = "null"; - - private final Pattern pattern; - - public LinePayloadFormatter() { - pattern = Pattern.compile(REGEX); - } - - @Override - public List<Message> format(String topic, ByteBuf payload) { - List<Message> messages = new ArrayList<>(); - if (payload == null) { - return messages; - } - - String txt = payload.toString(StandardCharsets.UTF_8); - String[] lines = txt.split(LINE_BREAK); - // '/' previously defined as a database name - String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); - for (String line : lines) { - if (line.trim().startsWith(WELL)) { - continue; - } - TableMessage message = new TableMessage(); - try { - Matcher matcher = pattern.matcher(line.trim()); - if (!matcher.matches()) { - log.warn("Invalid line protocol format ,line is {}", line); - continue; - } - - // Parsing Database Name - message.setDatabase((database)); - - // Parsing Table Names - message.setTable(matcher.group(TABLE)); - - // Parsing Tags - if (!setTags(matcher, message)) { - log.warn("The tags is error , line is {}", line); - continue; - } - - // Parsing Attributes - if (!setAttributes(matcher, message)) { - log.warn("The attributes is error , line is {}", line); - continue; - } - - // Parsing Fields - if (!setFields(matcher, message)) { - log.warn("The fields is error , line is {}", line); - continue; - } - - // Parsing timestamp - if (!setTimestamp(matcher, message)) { - log.warn("The timestamp is error , line is {}", line); - continue; - } - - messages.add(message); - } catch (Exception e) { - log.warn( - "The line pattern parsing fails, and the failed line message is {} ,exception is", - line, - e); - } - } - return messages; - } - - @Override - @Deprecated - public List<Message> format(ByteBuf payload) { - throw new NotImplementedException(); - } - - private boolean setTags(Matcher matcher, TableMessage message) { - List<String> tagKeys = new ArrayList<>(); - List<Object> tagValues = new ArrayList<>(); - String tagsGroup = matcher.group(TAGS); - if (tagsGroup != null && !tagsGroup.isEmpty()) { - String[] tagPairs = tagsGroup.split(COMMA); - for (String tagPair : tagPairs) { - if (!tagPair.isEmpty()) { - String[] keyValue = tagPair.split(EQUAL); - if (keyValue.length == 2 && !NULL.equals(keyValue[1])) { - tagKeys.add(keyValue[0]); - tagValues.add(new Binary[] {new Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))}); - } - } - } - } - if (!tagKeys.isEmpty() && !tagValues.isEmpty() && tagKeys.size() == tagValues.size()) { - message.setTagKeys(tagKeys); - message.setTagValues(tagValues); - return true; - } else { - return false; - } - } - - private boolean setAttributes(Matcher matcher, TableMessage message) { - List<String> attributeKeys = new ArrayList<>(); - List<Object> attributeValues = new ArrayList<>(); - String attributesGroup = matcher.group(ATTRIBUTES); - if (attributesGroup != null && !attributesGroup.isEmpty()) { - String[] attributePairs = attributesGroup.split(COMMA); - for (String attributePair : attributePairs) { - if (!attributePair.isEmpty()) { - String[] keyValue = attributePair.split(EQUAL); - if (keyValue.length == 2 && !NULL.equals(keyValue[1])) { - attributeKeys.add(keyValue[0]); - attributeValues.add( - new Binary[] {new Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))}); - } - } - } - } - if (attributeKeys.size() == attributeValues.size()) { - message.setAttributeKeys(attributeKeys); - message.setAttributeValues(attributeValues); - return true; - } else { - return false; - } - } - - private boolean setFields(Matcher matcher, TableMessage message) { - List<String> fields = new ArrayList<>(); - List<TSDataType> dataTypes = new ArrayList<>(); - List<Object> values = new ArrayList<>(); - String fieldsGroup = matcher.group(FIELDS); - if (fieldsGroup != null && !fieldsGroup.isEmpty()) { - String[] fieldPairs = splitFieldPairs(fieldsGroup); - for (String fieldPair : fieldPairs) { - if (!fieldPair.isEmpty()) { - String[] keyValue = fieldPair.split(EQUAL); - if (keyValue.length == 2 && !NULL.equals(keyValue[1])) { - fields.add(keyValue[0]); - Pair<TSDataType, Object> typeAndValue = analyticValue(keyValue[1]); - values.add(typeAndValue.getRight()); - dataTypes.add(typeAndValue.getLeft()); - } - } - } - } - if (!fields.isEmpty() && !values.isEmpty() && fields.size() == values.size()) { - message.setFields(fields); - message.setDataTypes(dataTypes); - message.setValues(values); - return true; - } else { - return false; - } - } - - private String[] splitFieldPairs(String fieldsGroup) { - - if (fieldsGroup == null || fieldsGroup.isEmpty()) return new String[0]; - Matcher m = Pattern.compile("\\w+=\"[^\"]*\"|\\w+=[^,]*").matcher(fieldsGroup); - Stream.Builder<String> builder = Stream.builder(); - - while (m.find()) builder.add(m.group()); - return builder.build().toArray(String[]::new); - } - - private Pair<TSDataType, Object> analyticValue(String value) { - if (value.startsWith("\"") && value.endsWith("\"")) { - // String - return new Pair<>( - TSDataType.TEXT, - new Binary[] { - new Binary(value.substring(1, value.length() - 1).getBytes(StandardCharsets.UTF_8)) - }); - } else if (value.equalsIgnoreCase("t") - || value.equalsIgnoreCase("true") - || value.equalsIgnoreCase("f") - || value.equalsIgnoreCase("false")) { - // boolean - return new Pair<>( - TSDataType.BOOLEAN, - new boolean[] {value.equalsIgnoreCase("t") || value.equalsIgnoreCase("true")}); - } else if (value.endsWith("f")) { - // float - return new Pair<>( - TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, value.length() - 1))}); - } else if (value.endsWith("i32")) { - // int - return new Pair<>( - TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, value.length() - 3))}); - } else if (value.endsWith("u") || value.endsWith("i")) { - // long - return new Pair<>( - TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, value.length() - 1))}); - } else { - // double - return new Pair<>(TSDataType.DOUBLE, new double[] {Double.parseDouble(value)}); - } - } - - private boolean setTimestamp(Matcher matcher, TableMessage message) { - String timestampGroup = matcher.group(TIMESTAMP); - if (timestampGroup != null && !timestampGroup.isEmpty()) { - message.setTimestamp(Long.parseLong(timestampGroup)); - return true; - } else { - return false; - } - } - - @Override - public String getName() { - return "line"; - } - - @Override - public String getType() { - return PayloadFormatter.TABLE_TYPE; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java deleted file mode 100644 index 563c6147807..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; -import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.protocol.session.IClientSession; -import org.apache.iotdb.db.protocol.session.MqttClientSession; -import org.apache.iotdb.db.protocol.session.SessionManager; -import org.apache.iotdb.db.queryengine.plan.Coordinator; -import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; -import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; -import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; -import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; -import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; -import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; -import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; -import org.apache.iotdb.db.queryengine.plan.relational.security.TreeAccessCheckContext; -import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; -import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; -import org.apache.iotdb.db.utils.CommonUtils; -import org.apache.iotdb.db.utils.TimestampPrecisionUtils; -import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; - -import io.moquette.interception.AbstractInterceptHandler; -import io.moquette.interception.messages.InterceptConnectMessage; -import io.moquette.interception.messages.InterceptDisconnectMessage; -import io.moquette.interception.messages.InterceptPublishMessage; -import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.mqtt.MqttQoS; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.utils.BitMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.ZoneId; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** PublishHandler handle the messages from MQTT clients. */ -public class MPPPublishHandler extends AbstractInterceptHandler { - - private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class); - - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private final SessionManager sessionManager = SessionManager.getInstance(); - - private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap = - new ConcurrentHashMap<>(); - private final PayloadFormatter payloadFormat; - private final IPartitionFetcher partitionFetcher; - private final ISchemaFetcher schemaFetcher; - private final boolean useTableInsert; - - public MPPPublishHandler(IoTDBConfig config) { - this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter()); - partitionFetcher = ClusterPartitionFetcher.getInstance(); - schemaFetcher = ClusterSchemaFetcher.getInstance(); - useTableInsert = PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType()); - } - - @Override - public String getID() { - return "iotdb-mqtt-broker-listener"; - } - - @Override - public void onConnect(InterceptConnectMessage msg) { - if (msg.getClientID() == null || msg.getClientID().trim().isEmpty()) { - LOG.error( - "Connection refused: client_id is missing or empty. A valid client_id is required to establish a connection."); - } - if (!clientIdToSessionMap.containsKey(msg.getClientID())) { - MqttClientSession session = new MqttClientSession(msg.getClientID()); - sessionManager.login( - session, - msg.getUsername(), - new String(msg.getPassword()), - ZoneId.systemDefault().toString(), - TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, - ClientVersion.V_1_0, - useTableInsert ? IClientSession.SqlDialect.TABLE : IClientSession.SqlDialect.TREE); - sessionManager.registerSessionForMqtt(session); - clientIdToSessionMap.put(msg.getClientID(), session); - } - } - - @Override - public void onDisconnect(InterceptDisconnectMessage msg) { - MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID()); - if (null != session) { - sessionManager.removeCurrSessionForMqtt(session); - sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); - } - } - - @Override - public void onPublish(InterceptPublishMessage msg) { - try { - String clientId = msg.getClientID(); - if (!clientIdToSessionMap.containsKey(clientId)) { - return; - } - MqttClientSession session = clientIdToSessionMap.get(msg.getClientID()); - ByteBuf payload = msg.getPayload(); - String topic = msg.getTopicName(); - - if (LOG.isDebugEnabled()) { - String username = msg.getUsername(); - MqttQoS qos = msg.getQos(); - LOG.debug( - "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", - clientId, - username, - qos, - topic, - payload); - } - - List<Message> messages = payloadFormat.format(topic, payload); - if (messages == null) { - return; - } - - for (Message message : messages) { - if (message == null) { - continue; - } - if (useTableInsert) { - insertTable((TableMessage) message, session); - } else { - insertTree((TreeMessage) message, session); - } - } - } catch (Throwable t) { - LOG.warn("onPublish execution exception, msg is [{}], error is ", msg, t); - } finally { - // release the payload of the message - super.onPublish(msg); - } - } - - /** Inserting table using tablet */ - private void insertTable(TableMessage message, MqttClientSession session) { - TSStatus tsStatus = null; - try { - TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp()); - InsertTabletStatement insertTabletStatement = constructInsertTabletStatement(message); - session.setDatabaseName(message.getDatabase().toLowerCase()); - session.setSqlDialect(IClientSession.SqlDialect.TABLE); - long queryId = sessionManager.requestQueryId(); - SqlParser relationSqlParser = new SqlParser(); - Metadata metadata = LocalExecutionPlanner.getInstance().metadata; - ExecutionResult result = - Coordinator.getInstance() - .executeForTableModel( - insertTabletStatement, - relationSqlParser, - session, - queryId, - sessionManager.getSessionInfo(session), - "", - metadata, - config.getQueryTimeoutThreshold()); - - tsStatus = result.status; - if (LOG.isDebugEnabled()) { - LOG.debug("process result: {}", tsStatus); - } - if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - || tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - LOG.warn("mqtt line insert error , message = {}", tsStatus.message); - } - } catch (Exception e) { - LOG.warn( - "meet error when inserting database {}, table {}, tags {}, attributes {}, fields {}, at time {}, because ", - message.getDatabase(), - message.getTable(), - message.getTagKeys(), - message.getAttributeKeys(), - message.getFields(), - message.getTimestamp(), - e); - } - } - - private InsertTabletStatement constructInsertTabletStatement(TableMessage message) { - InsertTabletStatement insertStatement = new InsertTabletStatement(); - insertStatement.setDevicePath(new PartialPath(message.getTable(), false)); - List<String> measurements = - Stream.of(message.getFields(), message.getTagKeys(), message.getAttributeKeys()) - .flatMap(List::stream) - .collect(Collectors.toList()); - insertStatement.setMeasurements(measurements.toArray(new String[0])); - long[] timestamps = new long[] {message.getTimestamp()}; - insertStatement.setTimes(timestamps); - int columnSize = measurements.size(); - int rowSize = 1; - - BitMap[] bitMaps = new BitMap[columnSize]; - Object[] columns = - Stream.of(message.getValues(), message.getTagValues(), message.getAttributeValues()) - .flatMap(List::stream) - .toArray(Object[]::new); - insertStatement.setColumns(columns); - insertStatement.setBitMaps(bitMaps); - insertStatement.setRowCount(rowSize); - insertStatement.setAligned(false); - insertStatement.setWriteToTable(true); - TSDataType[] dataTypes = new TSDataType[measurements.size()]; - TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[measurements.size()]; - for (int i = 0; i < message.getFields().size(); i++) { - dataTypes[i] = message.getDataTypes().get(i); - columnCategories[i] = TsTableColumnCategory.FIELD; - } - for (int i = message.getFields().size(); - i < message.getFields().size() + message.getTagKeys().size(); - i++) { - dataTypes[i] = TSDataType.STRING; - columnCategories[i] = TsTableColumnCategory.TAG; - } - for (int i = message.getFields().size() + message.getTagKeys().size(); - i - < message.getFields().size() - + message.getTagKeys().size() - + message.getAttributeKeys().size(); - i++) { - dataTypes[i] = TSDataType.STRING; - columnCategories[i] = TsTableColumnCategory.ATTRIBUTE; - } - insertStatement.setDataTypes(dataTypes); - insertStatement.setColumnCategories(columnCategories); - - return insertStatement; - } - - private void insertTree(TreeMessage message, MqttClientSession session) { - TSStatus tsStatus = null; - try { - InsertRowStatement statement = new InsertRowStatement(); - statement.setDevicePath( - DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice())); - TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp()); - statement.setTime(message.getTimestamp()); - statement.setMeasurements(message.getMeasurements().toArray(new String[0])); - if (message.getDataTypes() == null) { - statement.setDataTypes(new TSDataType[message.getMeasurements().size()]); - statement.setValues(message.getValues().toArray(new Object[0])); - statement.setNeedInferType(true); - } else { - List<TSDataType> dataTypes = message.getDataTypes(); - List<String> values = message.getValues(); - Object[] inferredValues = new Object[values.size()]; - for (int i = 0; i < values.size(); ++i) { - inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i)); - } - statement.setDataTypes(dataTypes.toArray(new TSDataType[0])); - statement.setValues(inferredValues); - } - statement.setAligned(false); - - tsStatus = - AuthorityChecker.checkAuthority( - statement, - new TreeAccessCheckContext( - session.getUserId(), session.getUsername(), session.getClientAddress())); - if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOG.warn(tsStatus.message); - } else { - long queryId = sessionManager.requestQueryId(); - ExecutionResult result = - Coordinator.getInstance() - .executeForTreeModel( - statement, - queryId, - sessionManager.getSessionInfo(session), - "", - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); - tsStatus = result.status; - if (LOG.isDebugEnabled()) { - LOG.debug("process result: {}", tsStatus); - } - if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - || tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - LOG.warn("mqtt json insert error , message = {}", tsStatus.message); - } - } - } catch (Exception e) { - LOG.warn( - "meet error when inserting device {}, measurements {}, at time {}, because ", - message.getDevice(), - message.getMeasurements(), - message.getTimestamp(), - e); - } - } - - @Override - public void onSessionLoopError(Throwable throwable) { - // TODO: Implement something sensible here ... - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java deleted file mode 100644 index ba31d869760..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.mqtt; - -/** Generic parsing of messages */ -public class Message { - - protected Long timestamp; - - public Long getTimestamp() { - return timestamp; - } - - public void setTimestamp(Long timestamp) { - this.timestamp = timestamp; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java deleted file mode 100644 index c0b48539cd7..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.iotdb.commons.file.SystemFileFactory; -import org.apache.iotdb.db.conf.IoTDBDescriptor; - -import com.google.common.base.Preconditions; -import org.apache.tsfile.external.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.ServiceLoader; - -/** PayloadFormatManager loads payload formatter from SPI services. */ -public class PayloadFormatManager { - - private PayloadFormatManager() {} - - private static final Logger logger = LoggerFactory.getLogger(PayloadFormatManager.class); - - // The dir saving MQTT payload plugin .jar files - private static String mqttDir; - // Map: formatterName => PayloadFormatter - private static Map<String, PayloadFormatter> mqttPayloadPluginMap = new HashMap<>(); - - static { - init(); - } - - private static void init() { - mqttDir = IoTDBDescriptor.getInstance().getConfig().getMqttDir(); - logger.info("mqttDir: {}", mqttDir); - - try { - makeMqttPluginDir(); - buildMqttPluginMap(); - } catch (IOException e) { - logger.error("MQTT PayloadFormatManager init() error.", e); - } - } - - public static PayloadFormatter getPayloadFormat(String name) { - PayloadFormatter formatter = mqttPayloadPluginMap.get(name); - Preconditions.checkArgument(formatter != null, "Unknown payload format named: " + name); - return formatter; - } - - private static void makeMqttPluginDir() throws IOException { - File file = SystemFileFactory.INSTANCE.getFile(mqttDir); - if (file.exists() && file.isDirectory()) { - return; - } - FileUtils.forceMkdir(file); - } - - private static void buildMqttPluginMap() throws IOException { - ServiceLoader<PayloadFormatter> payloadFormatters = ServiceLoader.load(PayloadFormatter.class); - for (PayloadFormatter formatter : payloadFormatters) { - if (formatter == null) { - logger.error("PayloadFormatManager(), formatter is null."); - continue; - } - - String pluginName = formatter.getName(); - mqttPayloadPluginMap.put(pluginName, formatter); - logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName); - } - - URL[] jarURLs = getPluginJarURLs(mqttDir); - logger.debug("MQTT Plugin jarURLs: {}", Arrays.toString(jarURLs)); - - for (URL jarUrl : jarURLs) { - ClassLoader classLoader = new URLClassLoader(new URL[] {jarUrl}); - - // Use SPI to get all plugins' class - ServiceLoader<PayloadFormatter> payloadFormatters2 = - ServiceLoader.load(PayloadFormatter.class, classLoader); - - for (PayloadFormatter formatter : payloadFormatters2) { - if (formatter == null) { - logger.error("PayloadFormatManager(), formatter is null."); - continue; - } - - String pluginName = formatter.getName(); - if (mqttPayloadPluginMap.containsKey(pluginName)) { - continue; - } - mqttPayloadPluginMap.put(pluginName, formatter); - logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName); - } - } - } - - /** - * get all jar files in the given folder - * - * @param folderPath - * @return all jar files' URL - * @throws IOException - */ - private static URL[] getPluginJarURLs(String folderPath) throws IOException { - HashSet<File> fileSet = - new HashSet<>( - org.apache.tsfile.external.commons.io.FileUtils.listFiles( - SystemFileFactory.INSTANCE.getFile(folderPath), new String[] {"jar"}, true)); - return org.apache.tsfile.external.commons.io.FileUtils.toURLs(fileSet.toArray(new File[0])); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java deleted file mode 100644 index c86648ac161..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.mqtt; - -import io.netty.buffer.ByteBuf; - -import java.util.List; - -/** - * PayloadFormatter format the payload to the messages. - * - * <p>This is a SPI interface. - * - * @see JSONPayloadFormatter - */ -public interface PayloadFormatter { - - public static final String TREE_TYPE = "tree"; - public static final String TABLE_TYPE = "table"; - - /** - * format a payload to a list of messages - * - * @param payload - * @return - */ - @Deprecated - List<Message> format(ByteBuf payload); - - /** - * format a payload of a topic to a list of messages - * - * @param topic - * @param payload - * @return - */ - default List<Message> format(String topic, ByteBuf payload) { - return format(payload); - } - - /** - * get the formatter name - * - * @return - */ - String getName(); - - String getType(); -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java deleted file mode 100644 index b8aec19da58..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.tsfile.enums.TSDataType; - -import java.util.List; - -/** Message parsing into a table */ -public class TableMessage extends Message { - - private String database; - - private String table; - - private List<String> tagKeys; - - private List<Object> tagValues; - - private List<String> attributeKeys; - - private List<Object> attributeValues; - - private List<String> fields; - - private List<TSDataType> dataTypes; - - private List<Object> values; - - public String getDatabase() { - return database; - } - - public void setDatabase(String database) { - this.database = database; - } - - public String getTable() { - return table; - } - - public void setTable(String table) { - this.table = table; - } - - public List<String> getTagKeys() { - return tagKeys; - } - - public void setTagKeys(List<String> tagKeys) { - this.tagKeys = tagKeys; - } - - public List<Object> getTagValues() { - return tagValues; - } - - public void setTagValues(List<Object> tagValues) { - this.tagValues = tagValues; - } - - public List<String> getAttributeKeys() { - return attributeKeys; - } - - public void setAttributeKeys(List<String> attributeKeys) { - this.attributeKeys = attributeKeys; - } - - public List<Object> getAttributeValues() { - return attributeValues; - } - - public void setAttributeValues(List<Object> attributeValues) { - this.attributeValues = attributeValues; - } - - public List<String> getFields() { - return fields; - } - - public void setFields(List<String> fields) { - this.fields = fields; - } - - public List<TSDataType> getDataTypes() { - return dataTypes; - } - - public void setDataTypes(List<TSDataType> dataTypes) { - this.dataTypes = dataTypes; - } - - public List<Object> getValues() { - return values; - } - - public void setValues(List<Object> values) { - this.values = values; - } - - @Override - public String toString() { - return "TableMessage{" - + "database='" - + database - + '\'' - + ", table='" - + table - + '\'' - + ", tagKeys=" - + tagKeys - + ", tagValues=" - + tagValues - + ", attributeKeys=" - + attributeKeys - + ", attributeValues=" - + attributeValues - + ", fields=" - + fields - + ", dataTypes=" - + dataTypes - + ", values=" - + values - + ", timestamp=" - + timestamp - + '}'; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java deleted file mode 100644 index 9416ea3c838..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.tsfile.enums.TSDataType; - -import java.util.List; - -/** Message parsing into a tree */ -public class TreeMessage extends Message { - private String device; - private List<String> measurements; - private List<TSDataType> dataTypes; - private List<String> values; - - public String getDevice() { - return device; - } - - public void setDevice(String device) { - this.device = device; - } - - public List<String> getMeasurements() { - return measurements; - } - - public void setMeasurements(List<String> measurements) { - this.measurements = measurements; - } - - public List<TSDataType> getDataTypes() { - return dataTypes; - } - - public void setDataTypes(List<TSDataType> dataTypes) { - this.dataTypes = dataTypes; - } - - public List<String> getValues() { - return values; - } - - public void setValues(List<String> values) { - this.values = values; - } - - @Override - public String toString() { - return "Message{" - + "device='" - + device - + '\'' - + ", timestamp=" - + super.timestamp - + ", measurements=" - + measurements - + ", values=" - + values - + '}'; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java index 21d4121ffd3..cc761d050eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java @@ -62,7 +62,7 @@ public abstract class IClientSession { abstract TSConnectionType getConnectionType(); - /** ip:port for thrift-based service and client id for mqtt-based service. */ + /** ip:port for thrift-based service. */ abstract String getConnectionId(); public void setClientVersion(ClientVersion clientVersion) { @@ -144,7 +144,6 @@ public abstract class IClientSession { * statementIds that this client opens.<br> * For JDBC clients, each Statement instance has a statement id.<br> * For an IoTDBSession connection, each connection has a statement id.<br> - * mqtt clients have no statement id. */ public abstract Iterable<Long> getStatementIds(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java deleted file mode 100644 index ae9e2cd0361..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.protocol.session; - -import org.apache.iotdb.service.rpc.thrift.TSConnectionType; - -import java.util.Collections; -import java.util.Set; - -public class MqttClientSession extends IClientSession { - - private final String clientID; - - public MqttClientSession(String clientID) { - this.clientID = clientID; - } - - @Override - public String getClientAddress() { - return clientID; - } - - @Override - public int getClientPort() { - return 0; - } - - @Override - TSConnectionType getConnectionType() { - return TSConnectionType.MQTT_BASED; - } - - @Override - String getConnectionId() { - return clientID; - } - - @Override - public Set<Long> getStatementIds() { - return Collections.emptySet(); - } - - @Override - public void addStatementId(long statementId) { - throw new UnsupportedOperationException(); - } - - @Override - public Set<Long> removeStatementId(long statementId) { - throw new UnsupportedOperationException(); - } - - @Override - public void addQueryId(Long statementId, long queryId) { - throw new UnsupportedOperationException(); - } - - @Override - public void removeQueryId(Long statementId, Long queryId) { - throw new UnsupportedOperationException(); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index d098cf70223..24dad9549c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -78,7 +78,6 @@ public class SessionManager implements SessionManagerMBean { private final ThreadLocal<Long> currSessionIdleTime = new ThreadLocal<>(); - // sessions does not contain MqttSessions.. private final Map<IClientSession, Object> sessions = new ConcurrentHashMap<>(); // used for sessions. private final Object placeHolder = new Object(); @@ -413,12 +412,6 @@ public class SessionManager implements SessionManagerMBean { currSessionIdleTime.remove(); } - public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) { - if (mqttClientSession != null) { - sessions.remove(mqttClientSession); - } - } - /** * this method can be only used in client-thread model. Do not use this method in message-thread * model based service. @@ -436,14 +429,6 @@ public class SessionManager implements SessionManagerMBean { return true; } - /** - * this method can be only used in mqtt model. Do not use this method in client-thread model based - * service. - */ - public void registerSessionForMqtt(IClientSession session) { - sessions.put(session, placeHolder); - } - /** must be called after registerSession()) will mark the session login. */ public void supplySession( IClientSession session, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index cebf7dab9f3..6eb80b4b372 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -1274,9 +1274,6 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { } private void initProtocols() throws StartupException { - if (config.isEnableMQTTService()) { - registerManager.register(MQTTService.getInstance()); - } if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) { registerManager.register(RestService.getInstance()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java deleted file mode 100644 index c6cc3fa47ee..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.service; - -import org.apache.iotdb.commons.service.IService; -import org.apache.iotdb.commons.service.ServiceType; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator; -import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler; - -import io.moquette.BrokerConstants; -import io.moquette.broker.Server; -import io.moquette.broker.config.IConfig; -import io.moquette.broker.config.MemoryConfig; -import io.moquette.broker.security.IAuthenticator; -import io.moquette.interception.InterceptHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -/** The IoTDB MQTT Service. */ -public class MQTTService implements IService { - private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class); - private final Server server = new Server(); - - private MQTTService() {} - - @Override - public void start() { - startup(); - } - - @Override - public void stop() { - shutdown(); - } - - public void startup() { - IoTDBConfig iotDBConfig = IoTDBDescriptor.getInstance().getConfig(); - IConfig config = createBrokerConfig(iotDBConfig); - List<InterceptHandler> handlers = new ArrayList<>(1); - handlers.add(new MPPPublishHandler(iotDBConfig)); - IAuthenticator authenticator = new BrokerAuthenticator(); - - try { - server.startServer(config, handlers, null, authenticator, null); - } catch (IOException e) { - throw new RuntimeException("Exception while starting server", e); - } - - LOG.info( - "Start MQTT service successfully, listening on ip {} port {}", - iotDBConfig.getMqttHost(), - iotDBConfig.getMqttPort()); - - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - LOG.info("Stopping IoTDB MQTT service..."); - shutdown(); - LOG.info("IoTDB MQTT service stopped."); - })); - } - - private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) { - Properties properties = new Properties(); - properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, iotDBConfig.getMqttHost()); - properties.setProperty( - BrokerConstants.PORT_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttPort())); - properties.setProperty( - BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, - String.valueOf(iotDBConfig.getMqttHandlerPoolSize())); - properties.setProperty( - BrokerConstants.DATA_PATH_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttDataPath())); - properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, "true"); - properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, "false"); - properties.setProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, "false"); - properties.setProperty( - BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, - String.valueOf(iotDBConfig.getMqttMaxMessageSize())); - return new MemoryConfig(properties); - } - - public void shutdown() { - server.stopServer(); - } - - @Override - public ServiceType getID() { - return ServiceType.MQTT_SERVICE; - } - - public static MQTTService getInstance() { - return MQTTServiceHolder.INSTANCE; - } - - private static class MQTTServiceHolder { - - private static final MQTTService INSTANCE = new MQTTService(); - - private MQTTServiceHolder() {} - } -} diff --git a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter index 488d6d02d50..585be9602fc 100644 --- a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter +++ b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter @@ -16,6 +16,3 @@ # specific language governing permissions and limitations # under the License. # - -org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter -org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java deleted file mode 100644 index 1f66b517e91..00000000000 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.utils.EnvironmentUtils; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -public class BrokerAuthenticatorTest { - - @Before - public void before() { - EnvironmentUtils.envSetUp(); - } - - @After - public void after() throws IOException, StorageEngineException { - EnvironmentUtils.cleanEnv(); - } - - @Test - public void checkValid() { - // In the previous implementation, the datanode will init a root file, - // but now checkuser operation needs to link to confignode. - // BrokerAuthenticator authenticator = new BrokerAuthenticator(); - // assertTrue(authenticator.checkValid(null, "root", "root".getBytes())); - // assertFalse(authenticator.checkValid(null, "", "foo".getBytes())); - // assertFalse(authenticator.checkValid(null, "root", null)); - // assertFalse(authenticator.checkValid(null, "foo", "foo".getBytes())); - } -} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java deleted file mode 100644 index deecf607d81..00000000000 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.db.protocol.mqtt; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.junit.Test; - -import java.nio.charset.StandardCharsets; - -import static org.junit.Assert.assertEquals; - -public class JSONPayloadFormatterTest { - - @Test - public void formatJson() { - String payload = - " {\n" - + " \"device\":\"root.sg.d1\",\n" - + " \"timestamp\":1586076045524,\n" - + " \"measurements\":[\"s1\",\"s2\"],\n" - + " \"values\":[0.530635,0.530635]\n" - + " }"; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - String topic = ""; - - JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0); - - assertEquals("root.sg.d1", message.getDevice()); - assertEquals(Long.valueOf(1586076045524L), message.getTimestamp()); - assertEquals("s1", message.getMeasurements().get(0)); - assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0); - } - - @Test - public void formatBatchJson() { - String payload = - " {\n" - + " \"device\":\"root.sg.d1\",\n" - + " \"timestamps\":[1586076045524,1586076065526],\n" - + " \"measurements\":[\"s1\",\"s2\"],\n" - + " \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" - + " }"; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - String topic = ""; - - JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1); - - assertEquals("root.sg.d1", message.getDevice()); - assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); - assertEquals("s2", message.getMeasurements().get(1)); - assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0); - } - - @Test - public void formatJsonArray() { - String payload = - " [\n" - + " {\n" - + " \"device\":\"root.sg.d1\",\n" - + " \"timestamp\":1586076045524,\n" - + " \"measurements\":[\"s1\",\"s2\"],\n" - + " \"values\":[0.530635,0.530635]\n" - + " },\n" - + " {\n" - + " \"device\":\"root.sg.d2\",\n" - + " \"timestamp\":1586076065526,\n" - + " \"measurements\":[\"s3\",\"s4\"],\n" - + " \"values\":[0.530655,0.530655]\n" - + " }\n" - + "]"; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - String topic = ""; - - JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1); - - assertEquals("root.sg.d2", message.getDevice()); - assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); - assertEquals("s3", message.getMeasurements().get(0)); - assertEquals(0.530655D, Double.parseDouble(message.getValues().get(0)), 0); - } - - @Test - public void formatBatchJsonArray() { - String payload = - "[\n" - + " {\n" - + " \"device\":\"root.sg.d1\",\n" - + " \"timestamps\":[1586076045524,1586076065526],\n" - + " \"measurements\":[\"s1\",\"s2\"],\n" - + " \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" - + " },\n" - + " {\n" - + " \"device\":\"root.sg.d2\",\n" - + " \"timestamps\":[1586076045524,1586076065526],\n" - + " \"measurements\":[\"s3\",\"s4\"],\n" - + " \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" - + " }" - + "]"; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - String topic = ""; - - JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3); - - assertEquals("root.sg.d2", message.getDevice()); - assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); - assertEquals("s4", message.getMeasurements().get(1)); - assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0); - } -} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java deleted file mode 100644 index 7bf9bce0702..00000000000 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.db.protocol.mqtt; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.apache.tsfile.utils.Binary; -import org.junit.Test; - -import java.nio.charset.StandardCharsets; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class LinePayloadFormatterTest { - - @Test - public void formatLine() { - String payload = - "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1"; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - String topic = ""; - - LinePayloadFormatter formatter = new LinePayloadFormatter(); - TableMessage message = (TableMessage) formatter.format(topic, buf).get(0); - - assertEquals("test1", message.getTable()); - assertEquals(Long.valueOf(1L), message.getTimestamp()); - assertEquals("tag1", message.getTagKeys().get(0)); - assertEquals("attr1", message.getAttributeKeys().get(0)); - assertEquals( - "value1", - ((Binary[]) message.getValues().get(0))[0].getStringValue(StandardCharsets.UTF_8)); - assertEquals(1L, ((long[]) message.getValues().get(1))[0], 0); - assertEquals(2L, ((long[]) message.getValues().get(2))[0], 0); - assertEquals(3L, ((int[]) message.getValues().get(3))[0], 0); - assertTrue(((boolean[]) message.getValues().get(4))[0]); - assertFalse(((boolean[]) message.getValues().get(5))[0]); - assertEquals(4d, ((double[]) message.getValues().get(6))[0], 0); - assertEquals(5f, ((float[]) message.getValues().get(7))[0], 0); - } - - @Test - public void formatBatchLine() { - String payload = - "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=1u 1 \n" - + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 "; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - String topic = ""; - - LinePayloadFormatter formatter = new LinePayloadFormatter(); - TableMessage message = (TableMessage) formatter.format(topic, buf).get(1); - - assertEquals("test2", message.getTable()); - assertEquals(Long.valueOf(2L), message.getTimestamp()); - assertEquals("tag3", message.getTagKeys().get(0)); - assertEquals("attr3", message.getAttributeKeys().get(0)); - assertEquals(10, ((int[]) message.getValues().get(2))[0], 0); - } - - @Test - public void formatLineAnnotation() { - String payload = - "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=1u 1 \n" - + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 "; - - ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); - String topic = ""; - - LinePayloadFormatter formatter = new LinePayloadFormatter(); - List<Message> message = formatter.format(topic, buf); - - assertEquals(1, message.size()); - } -} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java deleted file mode 100644 index 096f5d0d90d..00000000000 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iotdb.db.protocol.mqtt; - -import org.apache.iotdb.db.utils.EnvironmentUtils; - -import org.junit.After; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; - -public class PayloadFormatManagerTest { - @After - public void tearDown() throws Exception { - EnvironmentUtils.cleanAllDir(); - } - - @Test(expected = IllegalArgumentException.class) - public void getPayloadFormat() { - PayloadFormatManager.getPayloadFormat("txt"); - } - - @Test - public void getDefaultPayloadFormat() { - assertNotNull(PayloadFormatManager.getPayloadFormat("json")); - } -} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index ebd4d3b7ae0..479bf2a907a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -136,9 +136,6 @@ public class EnvironmentUtils { SchemaEngine.getInstance().clear(); FlushManager.getInstance().stop(); CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running); - // We must disable MQTT service as it will cost a lot of time to be shutdown, which may slow our - // unit tests. - IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(false); // clean cache if (memoryConfig.isMetaDataCacheEnable()) { @@ -240,8 +237,6 @@ public class EnvironmentUtils { cleanDir(config.getExtPipeDir()); // delete ext cleanDir(config.getExtDir()); - // delete mqtt dir - cleanDir(config.getMqttDir()); // delete wal for (String walDir : commonConfig.getWalDirs()) { cleanDir(walDir); diff --git a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml index 2443eb3313f..2522d9b65f5 100644 --- a/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml +++ b/iotdb-core/datanode/src/test/resources/datanode1conf/logback.xml @@ -230,7 +230,6 @@ <appender-ref ref="FILEALL"/> <appender-ref ref="stdout"/> </root> - <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/> <logger level="info" name="org.apache.iotdb.db.service"/> <logger level="info" name="org.apache.iotdb.db.conf"/> <logger level="info" name="org.apache.iotdb.db.cost.statistic"> diff --git a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml index 2443eb3313f..2522d9b65f5 100644 --- a/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml +++ b/iotdb-core/datanode/src/test/resources/datanode2conf/logback.xml @@ -230,7 +230,6 @@ <appender-ref ref="FILEALL"/> <appender-ref ref="stdout"/> </root> - <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/> <logger level="info" name="org.apache.iotdb.db.service"/> <logger level="info" name="org.apache.iotdb.db.conf"/> <logger level="info" name="org.apache.iotdb.db.cost.statistic"> diff --git a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml index 2443eb3313f..2522d9b65f5 100644 --- a/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml +++ b/iotdb-core/datanode/src/test/resources/datanode3conf/logback.xml @@ -230,7 +230,6 @@ <appender-ref ref="FILEALL"/> <appender-ref ref="stdout"/> </root> - <logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/> <logger level="info" name="org.apache.iotdb.db.service"/> <logger level="info" name="org.apache.iotdb.db.conf"/> <logger level="info" name="org.apache.iotdb.db.cost.statistic"> diff --git a/iotdb-core/datanode/src/test/resources/logback-test.xml b/iotdb-core/datanode/src/test/resources/logback-test.xml index f6c690d6ff7..96f2de206cf 100644 --- a/iotdb-core/datanode/src/test/resources/logback-test.xml +++ b/iotdb-core/datanode/src/test/resources/logback-test.xml @@ -54,9 +54,7 @@ <logger name="org.apache.iotdb.commons.service.RegisterManager" level="INFO"/> <logger name="org.apache.iotdb.db.service.DataNode" level="WARN"/> <logger name="org.apache.iotdb.db.service.ExternalRPCService" level="INFO"/> - <logger name="org.apache.iotdb.db.service.MQTTService" level="INFO"/> <logger name="org.apache.iotdb.db.storageengine.compaction.cross.rewrite.task" level="ERROR"/> - <logger name="io.moquette.broker.metrics.MQTTMessageLogger" level="ERROR"/> <logger name="DETAILED_FAILURE_QUERY_TRACE" level="ERROR"/> <logger name="org.apache.iotdb.db.storageengine.dataregion.DataRegion" level="INFO"/> <root level="ERROR"> diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 6c0d71addf3..f3f7f6a5185 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -2043,42 +2043,6 @@ procedure_completed_clean_interval=30 # Datatype: int procedure_completed_evict_ttl=60 -#################### -### MQTT Broker Configuration -#################### - -# whether to enable the mqtt service. -# effectiveMode: restart -# Datatype: boolean -enable_mqtt_service=false - -# the mqtt service binding host. -# effectiveMode: restart -# Datatype: String -mqtt_host=127.0.0.1 - -# the mqtt service binding port. -# effectiveMode: restart -# Datatype: int -mqtt_port=1883 - -# the handler pool size for handing the mqtt messages. -# effectiveMode: restart -# Datatype: int -mqtt_handler_pool_size=1 - -# the mqtt message payload formatter. -# effectiveMode: restart -# Options: [json, line] -# The built-in json only supports tree models, and the line only supports table models. -# Datatype: String -mqtt_payload_formatter=json - -# max length of mqtt message in byte -# effectiveMode: restart -# Datatype: int -mqtt_max_message_size=1048576 - #################### ### IoTDB-AI Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index 4a54d774c94..7c3b95258a1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -270,19 +270,9 @@ public class IoTDBConstant { public static final String TMP_FOLDER_NAME = "tmp"; public static final String DELETION_FOLDER_NAME = "deletion"; - public static final String MQTT_FOLDER_NAME = "mqtt"; public static final String WAL_FOLDER_NAME = "wal"; public static final String EXT_PIPE_FOLDER_NAME = "extPipe"; - // mqtt - public static final String ENABLE_MQTT = "enable_mqtt_service"; - public static final String MQTT_HOST_NAME = "mqtt_host"; - public static final String MQTT_PORT_NAME = "mqtt_port"; - public static final String MQTT_HANDLER_POOL_SIZE_NAME = "mqtt_handler_pool_size"; - public static final String MQTT_PAYLOAD_FORMATTER_NAME = "mqtt_payload_formatter"; - public static final String MQTT_DATA_PATH = "mqtt_data_path"; - public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size"; - // thrift public static final int LEFT_SIZE_IN_REQUEST = 4 * 1024 * 1024; public static final int DEFAULT_FETCH_SIZE = 5000; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java index 7267c79a665..55fad14e5a1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java @@ -25,7 +25,6 @@ public enum ServiceType { METRIC_SERVICE("Metrics ServerService", "MetricService"), RPC_SERVICE("RPC ServerService", "RPCService"), INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"), - MQTT_SERVICE("MQTTService", "MqttService"), AIR_GAP_SERVICE("AirGapService", "AirGapService"), MONITOR_SERVICE("Monitor ServerService", "Monitor"), STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"), diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 53d55ce54ca..5a1e76fe5f5 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -527,7 +527,7 @@ enum TSConnectionType { struct TSConnectionInfo { 1: required string userName 2: required i64 logInTime - 3: required string connectionId // ip:port for thrift-based service and clientId for mqtt-based service + 3: required string connectionId // ip:port for thrift-based service 4: required TSConnectionType type } diff --git a/pom.xml b/pom.xml index 4461cd45f5c..521c84719d5 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,6 @@ <enforcer.skip>true</enforcer.skip> <felix.version>5.1.9</felix.version> <findbugs.jsr305.version>3.0.2</findbugs.jsr305.version> - <fusesource-mqtt-client.version>1.16</fusesource-mqtt-client.version> <!-- JDK1.8 only support google java format 1.7--> <google.java.format.version>1.22.0</google.java.format.version> <gson.version>2.13.1</gson.version> @@ -523,11 +522,6 @@ </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.fusesource.mqtt-client</groupId> - <artifactId>mqtt-client</artifactId> - <version>${fusesource-mqtt-client.version}</version> - </dependency> <dependency> <groupId>org.eclipse.collections</groupId> <artifactId>eclipse-collections</artifactId>
