This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch add_mqtt_server in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
commit f9e66a13c165c141abd53cb864c44c2d7848acc8 Author: Tian Jiang <[email protected]> AuthorDate: Thu Oct 30 14:59:32 2025 +0800 tmp save --- examples/mqtt-customize/README.md | 42 +++ examples/mqtt-customize/pom.xml | 39 +++ .../server/CustomizedJsonPayloadFormatter.java | 74 +++++ ....apache.iotdb.db.protocol.mqtt.PayloadFormatter | 20 ++ examples/mqtt/README.md | 33 +++ examples/mqtt/pom.xml | 37 +++ .../java/org/apache/iotdb/mqtt/MQTTClient.java | 112 +++++++ examples/pom.xml | 6 + mqtt-service/pom.xml | 36 +++ .../java/org/apache/iotdb/protocol/mqtt/App.java | 13 + .../iotdb/protocol/mqtt/BrokerAuthenticator.java | 54 ++++ .../iotdb/protocol/mqtt/MPPPublishHandler.java | 326 +++++++++++++++++++++ .../apache/iotdb/protocol/mqtt/MQTTService.java | 124 ++++++++ .../org/apache/iotdb/protocol/mqtt/MqttConfig.java | 237 +++++++++++++++ .../apache/iotdb/protocol/mqtt/MqttConstant.java | 33 +++ .../protocol/mqtt/msg/JSONPayloadFormatter.java | 150 ++++++++++ .../protocol/mqtt/msg/LinePayloadFormatter.java | 282 ++++++++++++++++++ .../apache/iotdb/protocol/mqtt/msg/Message.java | 33 +++ .../protocol/mqtt/msg/PayloadFormatManager.java | 134 +++++++++ .../iotdb/protocol/mqtt/msg/PayloadFormatter.java | 66 +++++ .../iotdb/protocol/mqtt/msg/TableMessage.java | 144 +++++++++ .../iotdb/protocol/mqtt/msg/TreeMessage.java | 77 +++++ mqtt-service/src/main/resources/mqtt.properties | 35 +++ .../src/test/java/org/apache/iotdb/AppTest.java | 28 ++ .../org/apache/iotdb/BrokerAuthenticatorTest.java | 49 ++++ .../org/apache/iotdb/JSONPayloadFormatterTest.java | 134 +++++++++ .../org/apache/iotdb/LinePayloadFormatterTest.java | 96 ++++++ .../org/apache/iotdb/PayloadFormatManagerTest.java | 42 +++ pom.xml | 8 +- 29 files changed, 2463 insertions(+), 1 deletion(-) diff --git a/examples/mqtt-customize/README.md b/examples/mqtt-customize/README.md new file mode 100644 index 0000000..9e53c96 --- /dev/null +++ b/examples/mqtt-customize/README.md @@ -0,0 +1,42 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +# Customized IoTDB-MQTT-Broker Example + +## Function +``` +The example is to show how to customize your MQTT message format +``` + +## Usage + +* Define your implementation which implements `PayloadFormatter.java` +* modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`: + clean the file and put your implementation class name into the file +* compile your implementation as a jar file + + +Then, in your mqtt-service: +* Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. +** Start an IoTDB MqttService (in the mqtt-service module). +* Set the value of program parameter `-mqtt_payload_formatter` as the value of getName() in your implementation +* Launch the IoTDB server. +* Now IoTDB will use your implementation to parse the MQTT message. + diff --git a/examples/mqtt-customize/pom.xml b/examples/mqtt-customize/pom.xml new file mode 100644 index 0000000..9a530f4 --- /dev/null +++ b/examples/mqtt-customize/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-examples</artifactId> + <version>2.0.6-SNAPSHOT</version> + </parent> + <artifactId>customize-mqtt-example</artifactId> + <name>IoTDB: Example: Customized MQTT</name> + <dependencies> + <!-- used by the server--> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-server</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/examples/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/examples/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java new file mode 100644 index 0000000..8c3a962 --- /dev/null +++ b/examples/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.mqtt.server; + +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TreeMessage; + +import io.netty.buffer.ByteBuf; +import org.apache.tsfile.external.commons.lang3.NotImplementedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class CustomizedJsonPayloadFormatter implements PayloadFormatter { + + @Override + public List<Message> format(String topic, ByteBuf payload) { + // Suppose the payload is a json format + if (payload == null) { + return Collections.emptyList(); + } + + // parse data from the json and generate Messages and put them into List<Message> ret + List<Message> ret = new ArrayList<>(); + // this is just an example, so we just generate some Messages directly + for (int i = 0; i < 2; i++) { + long ts = i; + TreeMessage message = new TreeMessage(); + message.setDevice("d" + i); + message.setTimestamp(ts); + message.setMeasurements(Arrays.asList("s1", "s2")); + message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); + ret.add(message); + } + return ret; + } + + @Override + @Deprecated + public List<Message> format(ByteBuf payload) { + throw new NotImplementedException(); + } + + @Override + public String getName() { + // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: + return "CustomizedJson"; + } + + @Override + public String getType() { + return PayloadFormatter.TREE_TYPE; + } +} diff --git a/examples/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/examples/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter new file mode 100644 index 0000000..b0b824b --- /dev/null +++ b/examples/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter \ No newline at end of file diff --git a/examples/mqtt/README.md b/examples/mqtt/README.md new file mode 100644 index 0000000..b32adf8 --- /dev/null +++ b/examples/mqtt/README.md @@ -0,0 +1,33 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +# IoTDB-MQTT-Broker Example + +## Function +``` +The example is to show how to send data to IoTDB from a mqtt client. +``` + +## Usage + +* Start an IoTDB MqttService (in the mqtt-service module). +* Launch the IoTDB server. +* Setup database `CREATE DATABASE root.sg` and create time timeseries `CREATE TIMESERIES root.sg.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN`. +* Run `org.apache.iotdb.mqtt.MQTTClient` to run the mqtt client and send events to server. diff --git a/examples/mqtt/pom.xml b/examples/mqtt/pom.xml new file mode 100644 index 0000000..4e8aa19 --- /dev/null +++ b/examples/mqtt/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.iotdb</groupId> + <artifactId>examples</artifactId> + <version>2.0.6-SNAPSHOT</version> + </parent> + <artifactId>mqtt-example</artifactId> + <name>IoTDB Extras: Example: MQTT</name> + <dependencies> + <dependency> + <groupId>org.fusesource.mqtt-client</groupId> + <artifactId>mqtt-client</artifactId> + </dependency> + </dependencies> +</project> diff --git a/examples/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java b/examples/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java new file mode 100644 index 0000000..ec15ad2 --- /dev/null +++ b/examples/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.mqtt; + +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; + +import java.util.Random; + +public class MQTTClient { + + private static final String DATABASE = "myMqttTest"; + + public static void main(String[] args) throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setHost("127.0.0.1", 1883); + mqtt.setUserName("root"); + mqtt.setPassword("root"); + mqtt.setConnectAttemptsMax(3); + mqtt.setReconnectDelay(10); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + // the config mqttPayloadFormatter must be tree-json + // jsonPayloadFormatter(connection); + // the config mqttPayloadFormatter must be table-line + linePayloadFormatter(connection); + connection.disconnect(); + } + + private static void jsonPayloadFormatter(BlockingConnection connection) throws Exception { + Random random = new Random(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 10; i++) { + String payload = + String.format( + "{\n" + + "\"device\":\"root.sg.d1\",\n" + + "\"timestamp\":%d,\n" + + "\"measurements\":[\"s1\"],\n" + + "\"values\":[%f]\n" + + "}", + System.currentTimeMillis(), random.nextDouble()); + sb.append(payload).append(","); + + // publish a json object + Thread.sleep(1); + connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + } + // publish a json array + sb.insert(0, "["); + sb.replace(sb.lastIndexOf(","), sb.length(), "]"); + connection.publish("root.sg.d1.s1", sb.toString().getBytes(), QoS.AT_LEAST_ONCE, false); + } + + // The database must be created in advance + private static void linePayloadFormatter(BlockingConnection connection) throws Exception { + // myTable,tag1=t1,tag2=t2 fieldKey1="1,2,3" 1740109006001 + String payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006001"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + + payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006002"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + + payload = "myTable,tag1=t1,tag2=t2 fieldKey1=\"1,2,3\" 1740109006003"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + payload = + "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + + payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; + connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + + payload = + "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " + + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + + payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + + payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + } +} diff --git a/examples/pom.xml b/examples/pom.xml index 13a72c5..ee75005 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -73,6 +73,12 @@ <module>spark-table</module> </modules> </profile> + <profile> + <id>with-mqtt</id> + <modules> + <module>mqtt-example</module> + </modules> + </profile> </profiles> <build> <pluginManagement> diff --git a/mqtt-service/pom.xml b/mqtt-service/pom.xml new file mode 100644 index 0000000..4c0ff84 --- /dev/null +++ b/mqtt-service/pom.xml @@ -0,0 +1,36 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-extras-parent</artifactId> + <version>2.0.4-SNAPSHOT</version> + </parent> + + <artifactId>mqtt-service</artifactId> + <packaging>jar</packaging> + + <name>untitled</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-server</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tsfile</groupId> + <artifactId>common</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/App.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/App.java new file mode 100644 index 0000000..e755bae --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/App.java @@ -0,0 +1,13 @@ +package org.apache.iotdb.protocol.mqtt; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) + { + System.out.println( "Hello World!" ); + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/BrokerAuthenticator.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/BrokerAuthenticator.java new file mode 100644 index 0000000..2a5c8d3 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/BrokerAuthenticator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.protocol.mqtt; + +import io.moquette.broker.security.IAuthenticator; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.session.Session; +import org.apache.tsfile.external.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** The MQTT broker authenticator. */ +public class BrokerAuthenticator implements IAuthenticator { + private static final Logger LOG = LoggerFactory.getLogger(BrokerAuthenticator.class); + + private final MqttConfig config; + + public BrokerAuthenticator(MqttConfig config) { + this.config = config; + } + + @Override + public boolean checkValid(String clientId, String username, byte[] password) { + if (StringUtils.isBlank(username) || password == null) { + return false; + } + + // validate by logging in IoTDB + try (ISession ignored = new Session(config.getIotdbHost(), config.getIotdbPort(), + username, new String(password))) { + return true; + } catch (IoTDBConnectionException e) { + LOG.warn("Failed to connect to IoTDB server.", e); + return false; + } + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MPPPublishHandler.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MPPPublishHandler.java new file mode 100644 index 0000000..e3c7aa0 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MPPPublishHandler.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.protocol.mqtt; + +import io.moquette.interception.AbstractInterceptHandler; +import io.moquette.interception.messages.InterceptConnectMessage; +import io.moquette.interception.messages.InterceptDisconnectMessage; +import io.moquette.interception.messages.InterceptPublishMessage; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttQoS; +import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.protocol.mqtt.msg.Message; +import org.apache.iotdb.protocol.mqtt.msg.PayloadFormatManager; +import org.apache.iotdb.protocol.mqtt.msg.PayloadFormatter; +import org.apache.iotdb.protocol.mqtt.msg.TableMessage; +import org.apache.iotdb.protocol.mqtt.msg.TreeMessage; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.MqttClientSession; +import org.apache.iotdb.db.utils.CommonUtils; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; +import org.apache.iotdb.session.TableSessionBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** PublishHandler handle the messages from MQTT clients. */ +public class MPPPublishHandler extends AbstractInterceptHandler { + + private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class); + + private final MqttConfig config; + private final ConcurrentHashMap<String, ISession> clientIdToSessionMap = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, ITableSession> clientIdToTableSessionMap = + new ConcurrentHashMap<>(); + private final PayloadFormatter payloadFormat; + private final boolean useTableInsert; + + public MPPPublishHandler(MqttConfig config) { + this.config = config; + this.payloadFormat = config.getPayloadFormatManager().getPayloadFormat(config.getMqttPayloadFormatter()); + useTableInsert = PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType()); + } + + @Override + public String getID() { + return "iotdb-mqtt-broker-listener"; + } + + @Override + public void onConnect(InterceptConnectMessage msg) { + if (msg.getClientID() == null || msg.getClientID().trim().isEmpty()) { + LOG.error( + "Connection refused: client_id is missing or empty. A valid client_id is required to establish a connection."); + return; + } + + if (useTableInsert) { + clientIdToTableSessionMap.computeIfAbsent(msg.getClientID(), (cId) -> { + TableSessionBuilder sessionBuilder = new TableSessionBuilder(); + sessionBuilder.nodeUrls(Collections.singletonList(config.getIotdbHost() + ":" + config.getIotdbPort())) + .username(msg.getUsername()).password(new String(msg.getPassword())); + return sessionBuilder.build(); + }) + } + if (!clientIdToSessionMap.containsKey(msg.getClientID())) { + ISession session = new MqttClientSession(msg.getClientID()); + sessionManager.login( + session, + msg.getUsername(), + new String(msg.getPassword()), + ZoneId.systemDefault().toString(), + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, + ClientVersion.V_1_0, + useTableInsert ? IClientSession.SqlDialect.TABLE : IClientSession.SqlDialect.TREE); + sessionManager.registerSessionForMqtt(session); + clientIdToSessionMap.put(msg.getClientID(), session); + } + } + + @Override + public void onDisconnect(InterceptDisconnectMessage msg) { + MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID()); + if (null != session) { + sessionManager.removeCurrSessionForMqtt(session); + sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); + } + } + + @Override + public void onPublish(InterceptPublishMessage msg) { + try { + String clientId = msg.getClientID(); + if (!clientIdToSessionMap.containsKey(clientId)) { + return; + } + MqttClientSession session = clientIdToSessionMap.get(msg.getClientID()); + ByteBuf payload = msg.getPayload(); + String topic = msg.getTopicName(); + + if (LOG.isDebugEnabled()) { + String username = msg.getUsername(); + MqttQoS qos = msg.getQos(); + LOG.debug( + "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", + clientId, + username, + qos, + topic, + payload); + } + + List<org.apache.iotdb.protocol.mqtt.Message> messages = payloadFormat.format(topic, payload); + if (messages == null) { + return; + } + + for (Message message : messages) { + if (message == null) { + continue; + } + if (useTableInsert) { + insertTable((org.apache.iotdb.protocol.mqtt.TableMessage) message, session); + } else { + insertTree((org.apache.iotdb.protocol.mqtt.TreeMessage) message, session); + } + } + } catch (Throwable t) { + LOG.warn("onPublish execution exception, msg is [{}], error is ", msg, t); + } finally { + // release the payload of the message + super.onPublish(msg); + } + } + + /** Inserting table using tablet */ + private void insertTable(TableMessage message, MqttClientSession session) { + TSStatus tsStatus = null; + try { + TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp()); + InsertTabletStatement insertTabletStatement = constructInsertTabletStatement(message); + session.setDatabaseName(message.getDatabase().toLowerCase()); + session.setSqlDialect(IClientSession.SqlDialect.TABLE); + long queryId = sessionManager.requestQueryId(); + SqlParser relationSqlParser = new SqlParser(); + Metadata metadata = LocalExecutionPlanner.getInstance().metadata; + ExecutionResult result = + Coordinator.getInstance() + .executeForTableModel( + insertTabletStatement, + relationSqlParser, + session, + queryId, + sessionManager.getSessionInfo(session), + "", + metadata, + config.getQueryTimeoutThreshold()); + + tsStatus = result.status; + if (LOG.isDebugEnabled()) { + LOG.debug("process result: {}", tsStatus); + } + if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + || tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + LOG.warn("mqtt line insert error , message = {}", tsStatus.message); + } + } catch (Exception e) { + LOG.warn( + "meet error when inserting database {}, table {}, tags {}, attributes {}, fields {}, at time {}, because ", + message.getDatabase(), + message.getTable(), + message.getTagKeys(), + message.getAttributeKeys(), + message.getFields(), + message.getTimestamp(), + e); + } + } + + private InsertTabletStatement constructInsertTabletStatement(TableMessage message) { + InsertTabletStatement insertStatement = new InsertTabletStatement(); + insertStatement.setDevicePath(new PartialPath(message.getTable(), false)); + List<String> measurements = + Stream.of(message.getFields(), message.getTagKeys(), message.getAttributeKeys()) + .flatMap(List::stream) + .collect(Collectors.toList()); + insertStatement.setMeasurements(measurements.toArray(new String[0])); + long[] timestamps = new long[] {message.getTimestamp()}; + insertStatement.setTimes(timestamps); + int columnSize = measurements.size(); + int rowSize = 1; + + BitMap[] bitMaps = new BitMap[columnSize]; + Object[] columns = + Stream.of(message.getValues(), message.getTagValues(), message.getAttributeValues()) + .flatMap(List::stream) + .toArray(Object[]::new); + insertStatement.setColumns(columns); + insertStatement.setBitMaps(bitMaps); + insertStatement.setRowCount(rowSize); + insertStatement.setAligned(false); + insertStatement.setWriteToTable(true); + TSDataType[] dataTypes = new TSDataType[measurements.size()]; + TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[measurements.size()]; + for (int i = 0; i < message.getFields().size(); i++) { + dataTypes[i] = message.getDataTypes().get(i); + columnCategories[i] = TsTableColumnCategory.FIELD; + } + for (int i = message.getFields().size(); + i < message.getFields().size() + message.getTagKeys().size(); + i++) { + dataTypes[i] = TSDataType.STRING; + columnCategories[i] = TsTableColumnCategory.TAG; + } + for (int i = message.getFields().size() + message.getTagKeys().size(); + i + < message.getFields().size() + + message.getTagKeys().size() + + message.getAttributeKeys().size(); + i++) { + dataTypes[i] = TSDataType.STRING; + columnCategories[i] = TsTableColumnCategory.ATTRIBUTE; + } + insertStatement.setDataTypes(dataTypes); + insertStatement.setColumnCategories(columnCategories); + + return insertStatement; + } + + private void insertTree(TreeMessage message, MqttClientSession session) { + TSStatus tsStatus = null; + try { + InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath( + DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice())); + TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp()); + statement.setTime(message.getTimestamp()); + statement.setMeasurements(message.getMeasurements().toArray(new String[0])); + if (message.getDataTypes() == null) { + statement.setDataTypes(new TSDataType[message.getMeasurements().size()]); + statement.setValues(message.getValues().toArray(new Object[0])); + statement.setNeedInferType(true); + } else { + List<TSDataType> dataTypes = message.getDataTypes(); + List<String> values = message.getValues(); + Object[] inferredValues = new Object[values.size()]; + for (int i = 0; i < values.size(); ++i) { + inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i)); + } + statement.setDataTypes(dataTypes.toArray(new TSDataType[0])); + statement.setValues(inferredValues); + } + statement.setAligned(false); + + tsStatus = + AuthorityChecker.checkAuthority( + statement, + new TreeAccessCheckContext( + session.getUserId(), session.getUsername(), session.getClientAddress())); + if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOG.warn(tsStatus.message); + } else { + long queryId = sessionManager.requestQueryId(); + ExecutionResult result = + Coordinator.getInstance() + .executeForTreeModel( + statement, + queryId, + sessionManager.getSessionInfo(session), + "", + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); + tsStatus = result.status; + if (LOG.isDebugEnabled()) { + LOG.debug("process result: {}", tsStatus); + } + if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + || tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + LOG.warn("mqtt json insert error , message = {}", tsStatus.message); + } + } + } catch (Exception e) { + LOG.warn( + "meet error when inserting device {}, measurements {}, at time {}, because ", + message.getDevice(), + message.getMeasurements(), + message.getTimestamp(), + e); + } + } + + @Override + public void onSessionLoopError(Throwable throwable) { + // TODO: Implement something sensible here ... + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MQTTService.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MQTTService.java new file mode 100644 index 0000000..3ec4d47 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MQTTService.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.protocol.mqtt; + +import io.moquette.BrokerConstants; +import io.moquette.broker.Server; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.MemoryConfig; +import io.moquette.broker.security.IAuthenticator; +import io.moquette.interception.InterceptHandler; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.iotdb.commons.service.IService; +import org.apache.iotdb.commons.service.ServiceType; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator; +import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** The IoTDB MQTT Service. */ +public class MQTTService implements IService { + private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class); + private final Server server = new Server(); + + private MQTTService() {} + + @Override + public void start() { + startup(); + } + + @Override + public void stop() { + shutdown(); + } + + public void startup() { + IoTDBConfig iotDBConfig = IoTDBDescriptor.getInstance().getConfig(); + IConfig config = createBrokerConfig(iotDBConfig); + List<InterceptHandler> handlers = new ArrayList<>(1); + handlers.add(new MPPPublishHandler(iotDBConfig)); + IAuthenticator authenticator = new BrokerAuthenticator(); + + try { + server.startServer(config, handlers, null, authenticator, null); + } catch (IOException e) { + throw new RuntimeException("Exception while starting server", e); + } + + LOG.info( + "Start MQTT service successfully, listening on ip {} port {}", + iotDBConfig.getMqttHost(), + iotDBConfig.getMqttPort()); + + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + LOG.info("Stopping IoTDB MQTT service..."); + shutdown(); + LOG.info("IoTDB MQTT service stopped."); + })); + } + + private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) { + Properties properties = new Properties(); + properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, iotDBConfig.getMqttHost()); + properties.setProperty( + BrokerConstants.PORT_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttPort())); + properties.setProperty( + BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, + String.valueOf(iotDBConfig.getMqttHandlerPoolSize())); + properties.setProperty( + BrokerConstants.DATA_PATH_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttDataPath())); + properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, "true"); + properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, "false"); + properties.setProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, "false"); + properties.setProperty( + BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME, + String.valueOf(iotDBConfig.getMqttMaxMessageSize())); + return new MemoryConfig(properties); + } + + public void shutdown() { + server.stopServer(); + } + + @Override + public ServiceType getID() { + return ServiceType.MQTT_SERVICE; + } + + public static MQTTService getInstance() { + return MQTTServiceHolder.INSTANCE; + } + + private static class MQTTServiceHolder { + + private static final MQTTService INSTANCE = new MQTTService(); + + private MQTTServiceHolder() {} + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConfig.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConfig.java new file mode 100644 index 0000000..ff542a3 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConfig.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.protocol.mqtt; + +import static org.apache.iotdb.protocol.mqtt.MqttConstant.MQTT_FOLDER_NAME; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Properties; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.conf.TrimProperties; +import org.apache.iotdb.protocol.mqtt.msg.PayloadFormatManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttConfig { + + private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class); + private static final String DEFAULT_CONFIG_FILE_PATH = "mqtt.properties"; + + /** The mqtt service binding host. */ + private String mqttHost = "127.0.0.1"; + + /** The mqtt service binding port. */ + private int mqttPort = 1883; + + /** The target IoTDB binding host. */ + private String iotdbHost = "127.0.0.1"; + + /** The target IoTDB binding port. */ + private int iotdbPort = 6667; + + /** The handler pool size for handing the mqtt messages. */ + private int mqttHandlerPoolSize = Math.max(1, Runtime.getRuntime().availableProcessors() >> 1); + + /** The mqtt message payload formatter. */ + private String mqttPayloadFormatter = "json"; + + /** The mqtt save data path */ + private String mqttDataPath = "data/"; + + /** Max mqtt message size. Unit: byte */ + private int mqttMaxMessageSize = 1048576; + + /** Trigger MQTT forward pool size */ + private int triggerForwardMQTTPoolSize = 4; + + /** External lib directory for MQTT, stores user-uploaded JAR files */ + private String mqttDir = + IoTDBConstant.EXT_FOLDER_NAME + File.separator + MQTT_FOLDER_NAME; + + private PayloadFormatManager payloadFormatManager; + + private void init() { + this.payloadFormatManager = new PayloadFormatManager(this); + } + + public PayloadFormatManager getPayloadFormatManager() { + return payloadFormatManager; + } + + public String getMqttHost() { + return mqttHost; + } + + public void setMqttHost(String mqttHost) { + this.mqttHost = mqttHost; + } + + public int getMqttPort() { + return mqttPort; + } + + public void setMqttPort(int mqttPort) { + this.mqttPort = mqttPort; + } + + public int getMqttHandlerPoolSize() { + return mqttHandlerPoolSize; + } + + public void setMqttHandlerPoolSize(int mqttHandlerPoolSize) { + this.mqttHandlerPoolSize = mqttHandlerPoolSize; + } + + public String getMqttPayloadFormatter() { + return mqttPayloadFormatter; + } + + public void setMqttPayloadFormatter(String mqttPayloadFormatter) { + this.mqttPayloadFormatter = mqttPayloadFormatter; + } + + public String getMqttDataPath() { + return mqttDataPath; + } + + public void setMqttDataPath(String mqttDataPath) { + this.mqttDataPath = mqttDataPath; + } + + public int getMqttMaxMessageSize() { + return mqttMaxMessageSize; + } + + public void setMqttMaxMessageSize(int mqttMaxMessageSize) { + this.mqttMaxMessageSize = mqttMaxMessageSize; + } + + public int getTriggerForwardMQTTPoolSize() { + return triggerForwardMQTTPoolSize; + } + + public void setTriggerForwardMQTTPoolSize(int triggerForwardMQTTPoolSize) { + this.triggerForwardMQTTPoolSize = triggerForwardMQTTPoolSize; + } + + public String getMqttDir() { + return mqttDir; + } + + public void setMqttDir(String mqttDir) { + this.mqttDir = mqttDir; + } + + public String getIotdbHost() { + return iotdbHost; + } + + public void setIotdbHost(String iotdbHost) { + this.iotdbHost = iotdbHost; + } + + public int getIotdbPort() { + return iotdbPort; + } + + public void setIotdbPort(int iotdbPort) { + this.iotdbPort = iotdbPort; + } + + public static MqttConfig fromFile(String configFilePath) { + TrimProperties commonProperties = new TrimProperties(); + if (configFilePath == null) { + LOGGER.warn("configFilePath not set, use the default config file."); + configFilePath = DEFAULT_CONFIG_FILE_PATH; + } + + try (InputStream inputStream = Files.newInputStream(Paths.get(configFilePath))) { + LOGGER.info("Start to read config file {}", configFilePath); + Properties properties = new Properties(); + properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + commonProperties.putAll(properties); + return fromProperties(commonProperties); + } catch (FileNotFoundException e) { + LOGGER.error("Fail to find config file {}, reject DataNode startup.", configFilePath, e); + System.exit(-1); + } catch (IOException e) { + LOGGER.error("Cannot load config file, reject DataNode startup.", e); + System.exit(-1); + } catch (Exception e) { + LOGGER.error("Incorrect format in config file, reject DataNode startup.", e); + System.exit(-1); + } + return null; + } + + public static MqttConfig fromProperties(TrimProperties properties) { + MqttConfig conf = new MqttConfig(); + conf.setMqttDir(properties.getProperty("mqtt_root_dir", conf.getMqttDir())); + + if (properties.getProperty(MqttConstant.MQTT_HOST_NAME) != null) { + conf.setMqttHost(properties.getProperty(MqttConstant.MQTT_HOST_NAME)); + } + + if (properties.getProperty(MqttConstant.MQTT_PORT_NAME) != null) { + conf.setMqttPort( + Integer.parseInt(properties.getProperty(MqttConstant.MQTT_PORT_NAME))); + } + + if (properties.getProperty(MqttConstant.MQTT_HANDLER_POOL_SIZE_NAME) != null) { + conf.setMqttHandlerPoolSize( + Integer.parseInt( + properties.getProperty(MqttConstant.MQTT_HANDLER_POOL_SIZE_NAME))); + } + + if (properties.getProperty(MqttConstant.MQTT_PAYLOAD_FORMATTER_NAME) != null) { + conf.setMqttPayloadFormatter( + properties.getProperty(MqttConstant.MQTT_PAYLOAD_FORMATTER_NAME)); + } + + if (properties.getProperty(MqttConstant.MQTT_DATA_PATH) != null) { + conf.setMqttDataPath(properties.getProperty(MqttConstant.MQTT_DATA_PATH)); + } + + if (properties.getProperty(MqttConstant.MQTT_MAX_MESSAGE_SIZE) != null) { + conf.setMqttMaxMessageSize( + Integer.parseInt(properties.getProperty(MqttConstant.MQTT_MAX_MESSAGE_SIZE))); + } + + if (properties.getProperty(MqttConstant.IOTDB_HOST_NAME) != null) { + conf.setIotdbHost( + properties.getProperty(MqttConstant.IOTDB_HOST_NAME)); + } + + if (properties.getProperty(MqttConstant.IOTDB_CLIENT_RPC_PORT_NAME) != null) { + conf.setIotdbPort( + Integer.parseInt(properties.getProperty(MqttConstant.IOTDB_CLIENT_RPC_PORT_NAME))); + } + + conf.init(); + return conf; + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConstant.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConstant.java new file mode 100644 index 0000000..a5d176f --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/MqttConstant.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.protocol.mqtt; + +public class MqttConstant { + // mqtt + public static final String MQTT_HOST_NAME = "mqtt_host"; + public static final String MQTT_PORT_NAME = "mqtt_port"; + public static final String IOTDB_HOST_NAME = "iotdb_host"; + public static final String IOTDB_CLIENT_RPC_PORT_NAME = "iotdb_client_rpc_port"; + public static final String MQTT_HANDLER_POOL_SIZE_NAME = "mqtt_handler_pool_size"; + public static final String MQTT_PAYLOAD_FORMATTER_NAME = "mqtt_payload_formatter"; + public static final String MQTT_DATA_PATH = "mqtt_data_path"; + public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size"; + public static final String MQTT_FOLDER_NAME = "mqtt"; +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/JSONPayloadFormatter.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/JSONPayloadFormatter.java new file mode 100644 index 0000000..059add6 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/JSONPayloadFormatter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.protocol.mqtt.msg; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.reflect.TypeToken; +import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TreeMessage; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.external.commons.lang3.NotImplementedException; + +/** + * The JSON payload formatter. two json format supported: { "device":"root.sg.d1", + * "timestamp":1586076045524, "measurements":["s1","s2"], "values":[0.530635,0.530635] } + * + * <p>{ "device":"root.sg.d1", "timestamps":[1586076045524,1586076065526], + * "measurements":["s1","s2"], "values":[[0.530635,0.530635], [0.530655,0.530695]] } + */ +public class JSONPayloadFormatter implements org.apache.iotdb.db.protocol.mqtt.PayloadFormatter { + private static final String JSON_KEY_DEVICE = "device"; + private static final String JSON_KEY_TIMESTAMP = "timestamp"; + private static final String JSON_KEY_TIMESTAMPS = "timestamps"; + private static final String JSON_KEY_MEASUREMENTS = "measurements"; + private static final String JSON_KEY_VALUES = "values"; + private static final String JSON_KEY_DATATYPE = "datatypes"; + private static final Gson GSON = new GsonBuilder().create(); + + @Override + public List<Message> format(String topic, ByteBuf payload) { + if (payload == null) { + return new ArrayList<>(); + } + String txt = payload.toString(StandardCharsets.UTF_8); + JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) { + return formatJson(jsonObject); + } + if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) { + return formatBatchJson(jsonObject); + } + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + List<Message> messages = new ArrayList<>(); + for (JsonElement element : jsonArray) { + JsonObject jsonObject = element.getAsJsonObject(); + if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) { + messages.addAll(formatJson(jsonObject)); + } + if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) { + messages.addAll(formatBatchJson(jsonObject)); + } + } + return messages; + } + throw new JsonParseException("payload is invalidate"); + } + + @Override + @Deprecated + public List<Message> format(ByteBuf payload) { + throw new NotImplementedException(); + } + + private List<Message> formatJson(JsonObject jsonObject) { + org.apache.iotdb.db.protocol.mqtt.TreeMessage message = new org.apache.iotdb.db.protocol.mqtt.TreeMessage(); + message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString()); + message.setTimestamp(jsonObject.get(JSON_KEY_TIMESTAMP).getAsLong()); + message.setMeasurements( + GSON.fromJson( + jsonObject.get(JSON_KEY_MEASUREMENTS), new TypeToken<List<String>>() {}.getType())); + message.setValues( + GSON.fromJson(jsonObject.get(JSON_KEY_VALUES), new TypeToken<List<String>>() {}.getType())); + if (jsonObject.has(JSON_KEY_DATATYPE)) { + message.setDataTypes( + GSON.fromJson( + jsonObject.get(JSON_KEY_DATATYPE), new TypeToken<List<TSDataType>>() {}.getType())); + } + return Lists.newArrayList(message); + } + + private List<Message> formatBatchJson(JsonObject jsonObject) { + String device = jsonObject.get(JSON_KEY_DEVICE).getAsString(); + List<String> measurements = + GSON.fromJson( + jsonObject.getAsJsonArray(JSON_KEY_MEASUREMENTS), + new TypeToken<List<String>>() {}.getType()); + List<Long> timestamps = + GSON.fromJson( + jsonObject.get(JSON_KEY_TIMESTAMPS), new TypeToken<List<Long>>() {}.getType()); + List<List<String>> values = + GSON.fromJson( + jsonObject.get(JSON_KEY_VALUES), new TypeToken<List<List<String>>>() {}.getType()); + List<TSDataType> types = + jsonObject.has(JSON_KEY_DATATYPE) + ? GSON.fromJson( + jsonObject.get(JSON_KEY_DATATYPE), new TypeToken<List<TSDataType>>() {}.getType()) + : null; + + List<Message> ret = new ArrayList<>(timestamps.size()); + for (int i = 0; i < timestamps.size(); i++) { + org.apache.iotdb.db.protocol.mqtt.TreeMessage message = new TreeMessage(); + message.setDevice(device); + message.setTimestamp(timestamps.get(i)); + message.setMeasurements(measurements); + message.setValues(values.get(i)); + message.setDataTypes(types); + ret.add(message); + } + return ret; + } + + @Override + public String getName() { + return "json"; + } + + @Override + public String getType() { + return PayloadFormatter.TREE_TYPE; + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/LinePayloadFormatter.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/LinePayloadFormatter.java new file mode 100644 index 0000000..5fd57f4 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/LinePayloadFormatter.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.protocol.mqtt.msg; + +import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.external.commons.lang3.NotImplementedException; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Line payload formatter. myTable,tag1=value1,tag2=value2 attr1=value1,attr2=value2 + * fieldKey="fieldValue" 1740109006000 \n myTable,tag1=value1,tag2=value2 fieldKey="fieldValue" + * 1740109006000 + */ +public class LinePayloadFormatter implements org.apache.iotdb.db.protocol.mqtt.PayloadFormatter { + + private static final Logger log = LoggerFactory.getLogger(LinePayloadFormatter.class); + + /* + Regular expression matching line protocol ,the attributes field is not required + */ + private static final String REGEX = + "(?<table>\\w+)(,(?<tags>[^ ]+))?(\\s+(?<attributes>[^ ]+))?\\s+(?<fields>[^ ]+)\\s+(?<timestamp>\\d+)"; + private static final String COMMA = ","; + private static final String WELL = "#"; + private static final String LINE_BREAK = "\n"; + private static final String EQUAL = "="; + private static final String TABLE = "table"; + private static final String TAGS = "tags"; + private static final String ATTRIBUTES = "attributes"; + private static final String FIELDS = "fields"; + private static final String TIMESTAMP = "timestamp"; + private static final String NULL = "null"; + + private final Pattern pattern; + + public LinePayloadFormatter() { + pattern = Pattern.compile(REGEX); + } + + @Override + public List<org.apache.iotdb.db.protocol.mqtt.Message> format(String topic, ByteBuf payload) { + List<org.apache.iotdb.db.protocol.mqtt.Message> messages = new ArrayList<>(); + if (payload == null) { + return messages; + } + + String txt = payload.toString(StandardCharsets.UTF_8); + String[] lines = txt.split(LINE_BREAK); + // '/' previously defined as a database name + String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); + for (String line : lines) { + if (line.trim().startsWith(WELL)) { + continue; + } + org.apache.iotdb.db.protocol.mqtt.TableMessage message = new org.apache.iotdb.db.protocol.mqtt.TableMessage(); + try { + Matcher matcher = pattern.matcher(line.trim()); + if (!matcher.matches()) { + log.warn("Invalid line protocol format ,line is {}", line); + continue; + } + + // Parsing Database Name + message.setDatabase((database)); + + // Parsing Table Names + message.setTable(matcher.group(TABLE)); + + // Parsing Tags + if (!setTags(matcher, message)) { + log.warn("The tags is error , line is {}", line); + continue; + } + + // Parsing Attributes + if (!setAttributes(matcher, message)) { + log.warn("The attributes is error , line is {}", line); + continue; + } + + // Parsing Fields + if (!setFields(matcher, message)) { + log.warn("The fields is error , line is {}", line); + continue; + } + + // Parsing timestamp + if (!setTimestamp(matcher, message)) { + log.warn("The timestamp is error , line is {}", line); + continue; + } + + messages.add(message); + } catch (Exception e) { + log.warn( + "The line pattern parsing fails, and the failed line message is {} ,exception is", + line, + e); + } + } + return messages; + } + + @Override + @Deprecated + public List<Message> format(ByteBuf payload) { + throw new NotImplementedException(); + } + + private boolean setTags(Matcher matcher, org.apache.iotdb.db.protocol.mqtt.TableMessage message) { + List<String> tagKeys = new ArrayList<>(); + List<Object> tagValues = new ArrayList<>(); + String tagsGroup = matcher.group(TAGS); + if (tagsGroup != null && !tagsGroup.isEmpty()) { + String[] tagPairs = tagsGroup.split(COMMA); + for (String tagPair : tagPairs) { + if (!tagPair.isEmpty()) { + String[] keyValue = tagPair.split(EQUAL); + if (keyValue.length == 2 && !NULL.equals(keyValue[1])) { + tagKeys.add(keyValue[0]); + tagValues.add(new Binary[] {new Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))}); + } + } + } + } + if (!tagKeys.isEmpty() && !tagValues.isEmpty() && tagKeys.size() == tagValues.size()) { + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + return true; + } else { + return false; + } + } + + private boolean setAttributes(Matcher matcher, org.apache.iotdb.db.protocol.mqtt.TableMessage message) { + List<String> attributeKeys = new ArrayList<>(); + List<Object> attributeValues = new ArrayList<>(); + String attributesGroup = matcher.group(ATTRIBUTES); + if (attributesGroup != null && !attributesGroup.isEmpty()) { + String[] attributePairs = attributesGroup.split(COMMA); + for (String attributePair : attributePairs) { + if (!attributePair.isEmpty()) { + String[] keyValue = attributePair.split(EQUAL); + if (keyValue.length == 2 && !NULL.equals(keyValue[1])) { + attributeKeys.add(keyValue[0]); + attributeValues.add( + new Binary[] {new Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))}); + } + } + } + } + if (attributeKeys.size() == attributeValues.size()) { + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + return true; + } else { + return false; + } + } + + private boolean setFields(Matcher matcher, org.apache.iotdb.db.protocol.mqtt.TableMessage message) { + List<String> fields = new ArrayList<>(); + List<TSDataType> dataTypes = new ArrayList<>(); + List<Object> values = new ArrayList<>(); + String fieldsGroup = matcher.group(FIELDS); + if (fieldsGroup != null && !fieldsGroup.isEmpty()) { + String[] fieldPairs = splitFieldPairs(fieldsGroup); + for (String fieldPair : fieldPairs) { + if (!fieldPair.isEmpty()) { + String[] keyValue = fieldPair.split(EQUAL); + if (keyValue.length == 2 && !NULL.equals(keyValue[1])) { + fields.add(keyValue[0]); + Pair<TSDataType, Object> typeAndValue = analyticValue(keyValue[1]); + values.add(typeAndValue.getRight()); + dataTypes.add(typeAndValue.getLeft()); + } + } + } + } + if (!fields.isEmpty() && !values.isEmpty() && fields.size() == values.size()) { + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + return true; + } else { + return false; + } + } + + private String[] splitFieldPairs(String fieldsGroup) { + + if (fieldsGroup == null || fieldsGroup.isEmpty()) return new String[0]; + Matcher m = Pattern.compile("\\w+=\"[^\"]*\"|\\w+=[^,]*").matcher(fieldsGroup); + Stream.Builder<String> builder = Stream.builder(); + + while (m.find()) builder.add(m.group()); + return builder.build().toArray(String[]::new); + } + + private Pair<TSDataType, Object> analyticValue(String value) { + if (value.startsWith("\"") && value.endsWith("\"")) { + // String + return new Pair<>( + TSDataType.TEXT, + new Binary[] { + new Binary(value.substring(1, value.length() - 1).getBytes(StandardCharsets.UTF_8)) + }); + } else if (value.equalsIgnoreCase("t") + || value.equalsIgnoreCase("true") + || value.equalsIgnoreCase("f") + || value.equalsIgnoreCase("false")) { + // boolean + return new Pair<>( + TSDataType.BOOLEAN, + new boolean[] {value.equalsIgnoreCase("t") || value.equalsIgnoreCase("true")}); + } else if (value.endsWith("f")) { + // float + return new Pair<>( + TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, value.length() - 1))}); + } else if (value.endsWith("i32")) { + // int + return new Pair<>( + TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, value.length() - 3))}); + } else if (value.endsWith("u") || value.endsWith("i")) { + // long + return new Pair<>( + TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, value.length() - 1))}); + } else { + // double + return new Pair<>(TSDataType.DOUBLE, new double[] {Double.parseDouble(value)}); + } + } + + private boolean setTimestamp(Matcher matcher, TableMessage message) { + String timestampGroup = matcher.group(TIMESTAMP); + if (timestampGroup != null && !timestampGroup.isEmpty()) { + message.setTimestamp(Long.parseLong(timestampGroup)); + return true; + } else { + return false; + } + } + + @Override + public String getName() { + return "line"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/Message.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/Message.java new file mode 100644 index 0000000..26f0618 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/Message.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.protocol.mqtt.msg; + +/** Generic parsing of messages */ +public class Message { + + protected Long timestamp; + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatManager.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatManager.java new file mode 100644 index 0000000..cceb5f6 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatManager.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.protocol.mqtt.msg; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.protocol.mqtt.MqttConfig; +import org.apache.tsfile.external.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** PayloadFormatManager loads payload formatter from SPI services. */ +public class PayloadFormatManager { + + + private static final Logger logger = LoggerFactory.getLogger(PayloadFormatManager.class); + + // The dir saving MQTT payload plugin .jar files + private static String mqttDir; + // Map: formatterName => PayloadFormatter + private final Map<String, PayloadFormatter> mqttPayloadPluginMap = new HashMap<>(); + + private final MqttConfig config; + public PayloadFormatManager(MqttConfig config) { + this.config = config; + init(); + } + + private void init() { + mqttDir = config.getMqttDir(); + logger.info("mqttDir: {}", mqttDir); + + try { + makeMqttPluginDir(); + buildMqttPluginMap(); + } catch (IOException e) { + logger.error("MQTT PayloadFormatManager init() error.", e); + } + } + + public PayloadFormatter getPayloadFormat(String name) { + PayloadFormatter formatter = mqttPayloadPluginMap.get(name); + Preconditions.checkArgument(formatter != null, "Unknown payload format named: " + name); + return formatter; + } + + private void makeMqttPluginDir() throws IOException { + File file = SystemFileFactory.INSTANCE.getFile(mqttDir); + if (file.exists() && file.isDirectory()) { + return; + } + FileUtils.forceMkdir(file); + } + + private void buildMqttPluginMap() throws IOException { + ServiceLoader<PayloadFormatter> payloadFormatters = ServiceLoader.load( + PayloadFormatter.class); + for (PayloadFormatter formatter : payloadFormatters) { + if (formatter == null) { + logger.error("PayloadFormatManager(), formatter is null."); + continue; + } + + String pluginName = formatter.getName(); + mqttPayloadPluginMap.put(pluginName, formatter); + logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName); + } + + URL[] jarURLs = getPluginJarURLs(mqttDir); + logger.debug("MQTT Plugin jarURLs: {}", Arrays.toString(jarURLs)); + + for (URL jarUrl : jarURLs) { + ClassLoader classLoader = new URLClassLoader(new URL[] {jarUrl}); + + // Use SPI to get all plugins' class + ServiceLoader<PayloadFormatter> payloadFormatters2 = + ServiceLoader.load(PayloadFormatter.class, classLoader); + + for (PayloadFormatter formatter : payloadFormatters2) { + if (formatter == null) { + logger.error("PayloadFormatManager(), formatter is null."); + continue; + } + + String pluginName = formatter.getName(); + if (mqttPayloadPluginMap.containsKey(pluginName)) { + continue; + } + mqttPayloadPluginMap.put(pluginName, formatter); + logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName); + } + } + } + + /** + * get all jar files in the given folder + * + * @param folderPath + * @return all jar files' URL + * @throws IOException + */ + private static URL[] getPluginJarURLs(String folderPath) throws IOException { + HashSet<File> fileSet = + new HashSet<>( + org.apache.tsfile.external.commons.io.FileUtils.listFiles( + SystemFileFactory.INSTANCE.getFile(folderPath), new String[] {"jar"}, true)); + return org.apache.tsfile.external.commons.io.FileUtils.toURLs(fileSet.toArray(new File[0])); + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatter.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatter.java new file mode 100644 index 0000000..881beb0 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/PayloadFormatter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.protocol.mqtt.msg; + +import io.netty.buffer.ByteBuf; +import java.util.List; +import org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.Message; + +/** + * PayloadFormatter format the payload to the messages. + * + * <p>This is a SPI interface. + * + * @see JSONPayloadFormatter + */ +public interface PayloadFormatter { + + public static final String TREE_TYPE = "tree"; + public static final String TABLE_TYPE = "table"; + + /** + * format a payload to a list of messages + * + * @param payload + * @return + */ + @Deprecated + List<Message> format(ByteBuf payload); + + /** + * format a payload of a topic to a list of messages + * + * @param topic + * @param payload + * @return + */ + default List<Message> format(String topic, ByteBuf payload) { + return format(payload); + } + + /** + * get the formatter name + * + * @return + */ + String getName(); + + String getType(); +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TableMessage.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TableMessage.java new file mode 100644 index 0000000..90f2e32 --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TableMessage.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.protocol.mqtt.msg; + +import java.util.List; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.tsfile.enums.TSDataType; + +/** Message parsing into a table */ +public class TableMessage extends Message { + + private String database; + + private String table; + + private List<String> tagKeys; + + private List<Object> tagValues; + + private List<String> attributeKeys; + + private List<Object> attributeValues; + + private List<String> fields; + + private List<TSDataType> dataTypes; + + private List<Object> values; + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public List<String> getTagKeys() { + return tagKeys; + } + + public void setTagKeys(List<String> tagKeys) { + this.tagKeys = tagKeys; + } + + public List<Object> getTagValues() { + return tagValues; + } + + public void setTagValues(List<Object> tagValues) { + this.tagValues = tagValues; + } + + public List<String> getAttributeKeys() { + return attributeKeys; + } + + public void setAttributeKeys(List<String> attributeKeys) { + this.attributeKeys = attributeKeys; + } + + public List<Object> getAttributeValues() { + return attributeValues; + } + + public void setAttributeValues(List<Object> attributeValues) { + this.attributeValues = attributeValues; + } + + public List<String> getFields() { + return fields; + } + + public void setFields(List<String> fields) { + this.fields = fields; + } + + public List<TSDataType> getDataTypes() { + return dataTypes; + } + + public void setDataTypes(List<TSDataType> dataTypes) { + this.dataTypes = dataTypes; + } + + public List<Object> getValues() { + return values; + } + + public void setValues(List<Object> values) { + this.values = values; + } + + @Override + public String toString() { + return "TableMessage{" + + "database='" + + database + + '\'' + + ", table='" + + table + + '\'' + + ", tagKeys=" + + tagKeys + + ", tagValues=" + + tagValues + + ", attributeKeys=" + + attributeKeys + + ", attributeValues=" + + attributeValues + + ", fields=" + + fields + + ", dataTypes=" + + dataTypes + + ", values=" + + values + + ", timestamp=" + + timestamp + + '}'; + } +} diff --git a/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TreeMessage.java b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TreeMessage.java new file mode 100644 index 0000000..ee514eb --- /dev/null +++ b/mqtt-service/src/main/java/org/apache/iotdb/protocol/mqtt/msg/TreeMessage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.protocol.mqtt.msg; + +import java.util.List; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.tsfile.enums.TSDataType; + +/** Message parsing into a tree */ +public class TreeMessage extends Message { + private String device; + private List<String> measurements; + private List<TSDataType> dataTypes; + private List<String> values; + + public String getDevice() { + return device; + } + + public void setDevice(String device) { + this.device = device; + } + + public List<String> getMeasurements() { + return measurements; + } + + public void setMeasurements(List<String> measurements) { + this.measurements = measurements; + } + + public List<TSDataType> getDataTypes() { + return dataTypes; + } + + public void setDataTypes(List<TSDataType> dataTypes) { + this.dataTypes = dataTypes; + } + + public List<String> getValues() { + return values; + } + + public void setValues(List<String> values) { + this.values = values; + } + + @Override + public String toString() { + return "Message{" + + "device='" + + device + + '\'' + + ", timestamp=" + + super.timestamp + + ", measurements=" + + measurements + + ", values=" + + values + + '}'; + } +} diff --git a/mqtt-service/src/main/resources/mqtt.properties b/mqtt-service/src/main/resources/mqtt.properties new file mode 100644 index 0000000..a27a026 --- /dev/null +++ b/mqtt-service/src/main/resources/mqtt.properties @@ -0,0 +1,35 @@ +#################### +### MQTT Broker Configuration +#################### + +# whether to enable the mqtt service. +# effectiveMode: restart +# Datatype: boolean +enable_mqtt_service=false + +# the mqtt service binding host. +# effectiveMode: restart +# Datatype: String +mqtt_host=127.0.0.1 + +# the mqtt service binding port. +# effectiveMode: restart +# Datatype: int +mqtt_port=1883 + +# the handler pool size for handing the mqtt messages. +# effectiveMode: restart +# Datatype: int +mqtt_handler_pool_size=1 + +# the mqtt message payload formatter. +# effectiveMode: restart +# Options: [json, line] +# The built-in json only supports tree models, and the line only supports table models. +# Datatype: String +mqtt_payload_formatter=json + +# max length of mqtt message in byte +# effectiveMode: restart +# Datatype: int +mqtt_max_message_size=1048576 \ No newline at end of file diff --git a/mqtt-service/src/test/java/org/apache/iotdb/AppTest.java b/mqtt-service/src/test/java/org/apache/iotdb/AppTest.java new file mode 100644 index 0000000..3ea537e --- /dev/null +++ b/mqtt-service/src/test/java/org/apache/iotdb/AppTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb; + +/** + * Unit test for simple App. + */ +public class AppTest +{ + +} diff --git a/mqtt-service/src/test/java/org/apache/iotdb/BrokerAuthenticatorTest.java b/mqtt-service/src/test/java/org/apache/iotdb/BrokerAuthenticatorTest.java new file mode 100644 index 0000000..8e4ace8 --- /dev/null +++ b/mqtt-service/src/test/java/org/apache/iotdb/BrokerAuthenticatorTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb; + +import java.io.IOException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class BrokerAuthenticatorTest { + + @Before + public void before() { + EnvironmentUtils.envSetUp(); + } + + @After + public void after() throws IOException, StorageEngineException { + EnvironmentUtils.cleanEnv(); + } + + @Test + public void checkValid() { + // In the previous implementation, the datanode will init a root file, + // but now checkuser operation needs to link to confignode. + // BrokerAuthenticator authenticator = new BrokerAuthenticator(); + // assertTrue(authenticator.checkValid(null, "root", "root".getBytes())); + // assertFalse(authenticator.checkValid(null, "", "foo".getBytes())); + // assertFalse(authenticator.checkValid(null, "root", null)); + // assertFalse(authenticator.checkValid(null, "foo", "foo".getBytes())); + } +} diff --git a/mqtt-service/src/test/java/org/apache/iotdb/JSONPayloadFormatterTest.java b/mqtt-service/src/test/java/org/apache/iotdb/JSONPayloadFormatterTest.java new file mode 100644 index 0000000..324852c --- /dev/null +++ b/mqtt-service/src/test/java/org/apache/iotdb/JSONPayloadFormatterTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb; + +import static org.junit.Assert.assertEquals; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TreeMessage; +import org.junit.Test; + +public class JSONPayloadFormatterTest { + + @Test + public void formatJson() { + String payload = + " {\n" + + " \"device\":\"root.sg.d1\",\n" + + " \"timestamp\":1586076045524,\n" + + " \"measurements\":[\"s1\",\"s2\"],\n" + + " \"values\":[0.530635,0.530635]\n" + + " }"; + + ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; + + JSONPayloadFormatter formatter = new JSONPayloadFormatter(); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0); + + assertEquals("root.sg.d1", message.getDevice()); + assertEquals(Long.valueOf(1586076045524L), message.getTimestamp()); + assertEquals("s1", message.getMeasurements().get(0)); + assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0); + } + + @Test + public void formatBatchJson() { + String payload = + " {\n" + + " \"device\":\"root.sg.d1\",\n" + + " \"timestamps\":[1586076045524,1586076065526],\n" + + " \"measurements\":[\"s1\",\"s2\"],\n" + + " \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" + + " }"; + + ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; + + JSONPayloadFormatter formatter = new JSONPayloadFormatter(); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1); + + assertEquals("root.sg.d1", message.getDevice()); + assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); + assertEquals("s2", message.getMeasurements().get(1)); + assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0); + } + + @Test + public void formatJsonArray() { + String payload = + " [\n" + + " {\n" + + " \"device\":\"root.sg.d1\",\n" + + " \"timestamp\":1586076045524,\n" + + " \"measurements\":[\"s1\",\"s2\"],\n" + + " \"values\":[0.530635,0.530635]\n" + + " },\n" + + " {\n" + + " \"device\":\"root.sg.d2\",\n" + + " \"timestamp\":1586076065526,\n" + + " \"measurements\":[\"s3\",\"s4\"],\n" + + " \"values\":[0.530655,0.530655]\n" + + " }\n" + + "]"; + + ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; + + JSONPayloadFormatter formatter = new JSONPayloadFormatter(); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1); + + assertEquals("root.sg.d2", message.getDevice()); + assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); + assertEquals("s3", message.getMeasurements().get(0)); + assertEquals(0.530655D, Double.parseDouble(message.getValues().get(0)), 0); + } + + @Test + public void formatBatchJsonArray() { + String payload = + "[\n" + + " {\n" + + " \"device\":\"root.sg.d1\",\n" + + " \"timestamps\":[1586076045524,1586076065526],\n" + + " \"measurements\":[\"s1\",\"s2\"],\n" + + " \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" + + " },\n" + + " {\n" + + " \"device\":\"root.sg.d2\",\n" + + " \"timestamps\":[1586076045524,1586076065526],\n" + + " \"measurements\":[\"s3\",\"s4\"],\n" + + " \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" + + " }" + + "]"; + + ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; + + JSONPayloadFormatter formatter = new JSONPayloadFormatter(); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3); + + assertEquals("root.sg.d2", message.getDevice()); + assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); + assertEquals("s4", message.getMeasurements().get(1)); + assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0); + } +} diff --git a/mqtt-service/src/test/java/org/apache/iotdb/LinePayloadFormatterTest.java b/mqtt-service/src/test/java/org/apache/iotdb/LinePayloadFormatterTest.java new file mode 100644 index 0000000..96e055b --- /dev/null +++ b/mqtt-service/src/test/java/org/apache/iotdb/LinePayloadFormatterTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; +import org.apache.tsfile.utils.Binary; +import org.junit.Test; + +public class LinePayloadFormatterTest { + + @Test + public void formatLine() { + String payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1"; + + ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; + + LinePayloadFormatter formatter = new LinePayloadFormatter(); + TableMessage message = (TableMessage) formatter.format(topic, buf).get(0); + + assertEquals("test1", message.getTable()); + assertEquals(Long.valueOf(1L), message.getTimestamp()); + assertEquals("tag1", message.getTagKeys().get(0)); + assertEquals("attr1", message.getAttributeKeys().get(0)); + assertEquals( + "value1", + ((Binary[]) message.getValues().get(0))[0].getStringValue(StandardCharsets.UTF_8)); + assertEquals(1L, ((long[]) message.getValues().get(1))[0], 0); + assertEquals(2L, ((long[]) message.getValues().get(2))[0], 0); + assertEquals(3L, ((int[]) message.getValues().get(3))[0], 0); + assertTrue(((boolean[]) message.getValues().get(4))[0]); + assertFalse(((boolean[]) message.getValues().get(5))[0]); + assertEquals(4d, ((double[]) message.getValues().get(6))[0], 0); + assertEquals(5f, ((float[]) message.getValues().get(7))[0], 0); + } + + @Test + public void formatBatchLine() { + String payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=1u 1 \n" + + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 "; + + ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; + + LinePayloadFormatter formatter = new LinePayloadFormatter(); + TableMessage message = (TableMessage) formatter.format(topic, buf).get(1); + + assertEquals("test2", message.getTable()); + assertEquals(Long.valueOf(2L), message.getTimestamp()); + assertEquals("tag3", message.getTagKeys().get(0)); + assertEquals("attr3", message.getAttributeKeys().get(0)); + assertEquals(10, ((int[]) message.getValues().get(2))[0], 0); + } + + @Test + public void formatLineAnnotation() { + String payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=1u 1 \n" + + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 "; + + ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; + + LinePayloadFormatter formatter = new LinePayloadFormatter(); + List<Message> message = formatter.format(topic, buf); + + assertEquals(1, message.size()); + } +} diff --git a/mqtt-service/src/test/java/org/apache/iotdb/PayloadFormatManagerTest.java b/mqtt-service/src/test/java/org/apache/iotdb/PayloadFormatManagerTest.java new file mode 100644 index 0000000..17d2ed3 --- /dev/null +++ b/mqtt-service/src/test/java/org/apache/iotdb/PayloadFormatManagerTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb; + +import static org.junit.Assert.assertNotNull; + +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatManager; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.junit.After; +import org.junit.Test; + +public class PayloadFormatManagerTest { + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanAllDir(); + } + + @Test(expected = IllegalArgumentException.class) + public void getPayloadFormat() { + PayloadFormatManager.getPayloadFormat("txt"); + } + + @Test + public void getDefaultPayloadFormat() { + assertNotNull(PayloadFormatManager.getPayloadFormat("json")); + } +} diff --git a/pom.xml b/pom.xml index ca88f39..f5cb3dd 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ <module>distributions</module> <module>iotdb-collector</module> <module>mybatis-generator</module> + <module>mqtt-service</module> </modules> <properties> <!-- Explicitly set a variable used by all dependencies to the IoTDB dependencies, as the @@ -183,7 +184,7 @@ <thrift.version>0.14.1</thrift.version> <!-- This was the last version to support Java 8 --> <tomcat.version>9.0.86</tomcat.version> - <tsfile.version>2.1.1</tsfile.version> + <tsfile.version>2.2.0-251027-SNAPSHOT</tsfile.version> <xz.version>1.9</xz.version> <zeppelin.version>0.11.1</zeppelin.version> <zstd-jni.version>1.5.5-5</zstd-jni.version> @@ -979,6 +980,11 @@ <artifactId>iotdb-session</artifactId> <version>${iotdb.version}</version> </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-server</artifactId> + <version>${iotdb.version}</version> + </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId>
