This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb-extras.git
The following commit(s) were added to refs/heads/master by this push:
new ccc0aa8 Bump tsfile version to 2.1.1 (#94)
ccc0aa8 is described below
commit ccc0aa8dc21ec20ba4902859108a8cc3434e4411
Author: CritasWang <[email protected]>
AuthorDate: Tue Sep 23 17:29:13 2025 +0800
Bump tsfile version to 2.1.1 (#94)
* Bump tsfile version to 2.1.1
* fix
* fix build
* fix build
* fix build
* fix build
* fix build
* fix build
---
.github/workflows/compile-check.yml | 4 +-
.../java/org/apache/iotdb/flink/IoTDBSink.java | 11 +-
.../iotdb/flink/options/IoTDBSinkOptions.java | 8 +-
.../util/TSFileConfigUtilCompletenessTest.java | 8 +
connectors/spark-iotdb-connector/pom.xml | 15 ++
.../apache/iotdb/spark/table/db/IoTDBUtils.scala | 2 +-
.../spark/table/db/write/IoTDBDataWriter.scala | 2 +-
examples/iotdb-spring-boot-start/pom.xml | 3 +-
.../iotdb/kafka/relational/RelationalConstant.java | 60 +++--
.../iotdb/kafka/relational/RelationalConsumer.java | 218 +++++++++--------
.../kafka/relational/RelationalConsumerThread.java | 104 ++++----
.../iotdb/kafka/relational/RelationalProducer.java | 64 ++---
examples/mybatis-generator/pom.xml | 2 +-
examples/mybatisplus-generator/README.md | 2 +-
examples/mybatisplus-generator/pom.xml | 7 +-
.../iotdb/rabbitmq/RabbitMQChannelUtils.java | 9 +-
.../apache/iotdb/rabbitmq/RabbitMQConsumer.java | 7 +-
.../apache/iotdb/rabbitmq/RabbitMQProducer.java | 13 +-
.../rabbitmq/relational/RelationalConstant.java | 70 +++---
.../relational/RelationalRabbitMQConsumer.java | 253 ++++++++++----------
.../relational/RelationalRabbitMQProducer.java | 60 +++--
.../rocketmq/relational/RelationalConstant.java | 60 +++--
.../relational/RelationalRocketMQConsumer.java | 264 +++++++++++----------
.../relational/RelationalRocketMQProducer.java | 80 ++++---
.../iotdb/rocketmq/relational/RelationalUtils.java | 26 +-
iotdb-collector/collector-core/pom.xml | 1 +
iotdb-spring-boot-starter/pom.xml | 2 +-
pom.xml | 2 +-
28 files changed, 733 insertions(+), 624 deletions(-)
diff --git a/.github/workflows/compile-check.yml
b/.github/workflows/compile-check.yml
index 99b81e8..f043a77 100644
--- a/.github/workflows/compile-check.yml
+++ b/.github/workflows/compile-check.yml
@@ -48,7 +48,7 @@ jobs:
shell: bash
run: |
if [ "${{ matrix.java }}" -ge 17 ]; then
- mvn clean verify -P with-springboot -ntp
+ mvn clean verify -P with-springboot -P with-all-connectors -P
with-examples -ntp
else
- mvn clean verify -ntp
+ mvn clean verify -P with-all-connectors -P with-examples -ntp
fi
diff --git
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index 74b3ad0..f1403bc 100644
---
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -78,14 +78,13 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
void initSession() {
if (options.getNodeUrls() != null) {
- pool = new SessionPool(
- options.getNodeUrls(),
- options.getUser(),
- options.getPassword(),
- sessionPoolSize);
+ pool =
+ new SessionPool(
+ options.getNodeUrls(), options.getUser(), options.getPassword(),
sessionPoolSize);
return;
}
- pool = new SessionPool(
+ pool =
+ new SessionPool(
options.getHost(),
options.getPort(),
options.getUser(),
diff --git
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
index 347eee9..538e82d 100644
---
a/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
+++
b/connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/options/IoTDBSinkOptions.java
@@ -43,10 +43,10 @@ public class IoTDBSinkOptions extends IoTDBOptions {
}
public IoTDBSinkOptions(
- List<String> nodeUrls,
- String user,
- String password,
- List<TimeseriesOption> timeseriesOptionList) {
+ List<String> nodeUrls,
+ String user,
+ String password,
+ List<TimeseriesOption> timeseriesOptionList) {
super(nodeUrls, user, password);
this.timeseriesOptionList = timeseriesOptionList;
}
diff --git
a/connectors/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
b/connectors/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
index b0bedc0..0050e0e 100644
---
a/connectors/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
+++
b/connectors/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
@@ -92,6 +92,14 @@ public class TSFileConfigUtilCompletenessTest {
"setBooleanEncoding",
"setInt32Encoding",
"setTextEncoding",
+ "setLz4UseJni",
+ "setTextCompression",
+ "setBooleanCompression",
+ "setInt64Compression",
+ "setDoubleCompression",
+ "setInt32Compression",
+ "setFloatCompression",
+ "setEncryptKeyFromToken"
};
Set<String> addedSetters = new HashSet<>();
Collections.addAll(addedSetters, setters);
diff --git a/connectors/spark-iotdb-connector/pom.xml
b/connectors/spark-iotdb-connector/pom.xml
index 6ced32d..f59ee4f 100644
--- a/connectors/spark-iotdb-connector/pom.xml
+++ b/connectors/spark-iotdb-connector/pom.xml
@@ -53,6 +53,11 @@
<artifactId>iotdb-session</artifactId>
<version>${iotdb.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.tsfile</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${tsfile.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.tsfile</groupId>
<artifactId>common</artifactId>
@@ -90,6 +95,16 @@
-->
<pluginManagement>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <usedDependencies>
+ <!-- For some reason the plugin complains if this
artifact is included -->
+
<usedDependency>org.apache.tsfile:common</usedDependency>
+ </usedDependencies>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala
index c640caa..d5c4d05 100644
---
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/IoTDBUtils.scala
@@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.tsfile.enums.TSDataType
import org.apache.tsfile.read.common.RowRecord
import org.apache.tsfile.utils.{Binary, DateUtils}
-import org.apache.tsfile.write.record.Tablet.ColumnCategory
+import org.apache.tsfile.enums.ColumnCategory
import java.util
diff --git
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala
index c2feb29..97a69a7 100644
---
a/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala
+++
b/connectors/spark-iotdb-table-connector/spark-iotdb-table-common/src/main/scala/org/apache/iotdb/spark/table/db/write/IoTDBDataWriter.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.write.{DataWriter,
WriterCommitMessage}
import org.apache.spark.sql.types.{NullType, StructType}
import org.apache.tsfile.enums.TSDataType
import org.apache.tsfile.write.record.Tablet
-import org.apache.tsfile.write.record.Tablet.ColumnCategory
+import org.apache.tsfile.enums.ColumnCategory
class IoTDBDataWriter(options: IoTDBOptions, writeSchema: StructType,
tableSchema: StructType) extends DataWriter[InternalRow] with Logging {
diff --git a/examples/iotdb-spring-boot-start/pom.xml
b/examples/iotdb-spring-boot-start/pom.xml
index 27bd7c0..4fe181c 100644
--- a/examples/iotdb-spring-boot-start/pom.xml
+++ b/examples/iotdb-spring-boot-start/pom.xml
@@ -38,6 +38,7 @@
<version>3.5.1</version>
<google.java.format.version>1.22.0</google.java.format.version>
<spotless.version>2.43.0</spotless.version>
+ <iotdb.version>2.0.5</iotdb.version>
</properties>
<dependencies>
<dependency>
@@ -57,7 +58,7 @@
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>isession</artifactId>
- <version>2.0.4-SNAPSHOT</version>
+ <version>${iotdb.version}</version>
</dependency>
</dependencies>
<build>
diff --git
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java
index aebdef3..e26f15d 100644
---
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java
+++
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConstant.java
@@ -21,29 +21,39 @@ package org.apache.iotdb.kafka.relational;
public class RelationalConstant {
- public static final String KAFKA_SERVICE_URL = "172.20.31.71:9094";
- public static final String TOPIC = "Kafka-Relational-Test";
- public static final String[] IOTDB_URLS = {
- "127.0.0.1:6667"
- };
- public static final String IOTDB_USERNAME = "root";
- public static final String IOTDB_PASSWORD = "root";
- public static final int SESSION_SIZE = 3;
- public static final int CONSUMER_THREAD_NUM = 5;
- public static final String[] DATABASES = {"kafka_db1", "kafka_db2"};
- public static final String[][] TABLES = {
- // database, tableName, columnNames, columnTypes, columnCategories
- {"kafka_db1", "tb1", "time,region,model_id,temperature,status",
"TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
- {"kafka_db2", "tb2", "time,plant_id,humidity,status",
"TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
- };
- public static final String[] ALL_DATA = {
- //
database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
-
"kafka_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
-
"kafka_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
- "kafka_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
- "kafka_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
- "kafka_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
- "kafka_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
- "kafka_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
- };
+ public static final String KAFKA_SERVICE_URL = "172.20.31.71:9094";
+ public static final String TOPIC = "Kafka-Relational-Test";
+ public static final String[] IOTDB_URLS = {"127.0.0.1:6667"};
+ public static final String IOTDB_USERNAME = "root";
+ public static final String IOTDB_PASSWORD = "root";
+ public static final int SESSION_SIZE = 3;
+ public static final int CONSUMER_THREAD_NUM = 5;
+ public static final String[] DATABASES = {"kafka_db1", "kafka_db2"};
+ public static final String[][] TABLES = {
+ // database, tableName, columnNames, columnTypes, columnCategories
+ {
+ "kafka_db1",
+ "tb1",
+ "time,region,model_id,temperature,status",
+ "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN",
+ "TIME,TAG,ATTRIBUTE,FIELD,FIELD"
+ },
+ {
+ "kafka_db2",
+ "tb2",
+ "time,plant_id,humidity,status",
+ "TIMESTAMP,STRING,FLOAT,BOOLEAN",
+ "TIME,TAG,FIELD,FIELD"
+ }
+ };
+ public static final String[] ALL_DATA = {
+ //
database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
+
"kafka_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
+ "kafka_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
+ "kafka_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
+ "kafka_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
+ "kafka_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
+ "kafka_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
+ "kafka_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
+ };
}
diff --git
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java
index 3e9c37c..00eac08 100644
---
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java
+++
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumer.java
@@ -24,126 +24,136 @@ import org.apache.iotdb.isession.pool.ITableSessionPool;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.TableSessionPoolBuilder;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RelationalConsumer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalConsumer.class);
- private static ITableSessionPool tableSessionPool;
- private List<KafkaConsumer<String, String>> consumerList;
-
- private RelationalConsumer(List<KafkaConsumer<String, String>>
consumerList) {
- this.consumerList = consumerList;
- initSessionPool();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalConsumer.class);
+ private static ITableSessionPool tableSessionPool;
+ private List<KafkaConsumer<String, String>> consumerList;
+
+ private RelationalConsumer(List<KafkaConsumer<String, String>> consumerList)
{
+ this.consumerList = consumerList;
+ initSessionPool();
+ }
+
+ private static void initSessionPool() {
+ tableSessionPool =
+ new TableSessionPoolBuilder()
+ .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
+ .user(RelationalConstant.IOTDB_USERNAME)
+ .password(RelationalConstant.IOTDB_PASSWORD)
+ .maxSize(RelationalConstant.SESSION_SIZE)
+ .build();
+ }
+
+ public static void main(String[] args) {
+ List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
+ for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
RelationalConstant.KAFKA_SERVICE_URL);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, RelationalConstant.TOPIC);
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumerList.add(consumer);
+ consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC));
}
-
- private static void initSessionPool() {
- tableSessionPool =
- new TableSessionPoolBuilder()
- .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
- .user(RelationalConstant.IOTDB_USERNAME)
- .password(RelationalConstant.IOTDB_PASSWORD)
- .maxSize(RelationalConstant.SESSION_SIZE)
- .build();
- }
-
- public static void main(String[] args) {
- List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
- for (int i = 0; i < RelationalConstant.CONSUMER_THREAD_NUM; i++) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
RelationalConstant.KAFKA_SERVICE_URL);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.GROUP_ID_CONFIG,
RelationalConstant.TOPIC);
-
- KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
- consumerList.add(consumer);
-
consumer.subscribe(Collections.singleton(RelationalConstant.TOPIC));
- }
- RelationalConsumer consumer = new RelationalConsumer(consumerList);
- initIoTDB();
- consumer.consumeInParallel();
+ RelationalConsumer consumer = new RelationalConsumer(consumerList);
+ initIoTDB();
+ consumer.consumeInParallel();
+ }
+
+ private static void initIoTDB() {
+ for (String db : RelationalConstant.DATABASES) {
+ boolean res = createDatabase(db);
+ if (!res) {
+ throw new RuntimeException("Create database failed");
+ }
}
-
- private static void initIoTDB() {
- for (String db : RelationalConstant.DATABASES) {
- boolean res = createDatabase(db);
- if (!res) {
- throw new RuntimeException("Create database failed");
- }
- }
- for (String[] tableInfo : RelationalConstant.TABLES) {
- boolean res = createTable(tableInfo);
- if (!res) {
- throw new RuntimeException("Create table failed");
- }
- }
+ for (String[] tableInfo : RelationalConstant.TABLES) {
+ boolean res = createTable(tableInfo);
+ if (!res) {
+ throw new RuntimeException("Create table failed");
+ }
}
-
- private static boolean createDatabase(String dbName) {
- try (ITableSession session = tableSessionPool.getSession()) {
- try {
- session.executeNonQueryStatement(String.format("CREATE
DATABASE %s", dbName));
- } catch (IoTDBConnectionException | StatementExecutionException e)
{
- LOGGER.error("Create Database Error: ", e);
- return false;
- }
- } catch (IoTDBConnectionException e) {
- LOGGER.error("Get Table Session Error: ", e);
- return false;
- }
- return true;
+ }
+
+ private static boolean createDatabase(String dbName) {
+ try (ITableSession session = tableSessionPool.getSession()) {
+ try {
+ session.executeNonQueryStatement(String.format("CREATE DATABASE %s",
dbName));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Database Error: ", e);
+ return false;
+ }
+ } catch (IoTDBConnectionException e) {
+ LOGGER.error("Get Table Session Error: ", e);
+ return false;
}
-
- private static boolean createTable(String[] tableInfo) {
- try (ITableSession session = tableSessionPool.getSession()) {
- String sql = getCreateTableSQL(tableInfo);
- try {
- session.executeNonQueryStatement(sql);
- } catch (IoTDBConnectionException | StatementExecutionException e)
{
- LOGGER.error("Create Table Error: ", e);
- return false;
- }
- } catch (IoTDBConnectionException e) {
- LOGGER.error("Get Table Session Error: ", e);
- return false;
- }
- return true;
+ return true;
+ }
+
+ private static boolean createTable(String[] tableInfo) {
+ try (ITableSession session = tableSessionPool.getSession()) {
+ String sql = getCreateTableSQL(tableInfo);
+ try {
+ session.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Table Error: ", e);
+ return false;
+ }
+ } catch (IoTDBConnectionException e) {
+ LOGGER.error("Get Table Session Error: ", e);
+ return false;
}
-
- private static String getCreateTableSQL(String[] tableInfo) {
- StringBuilder sql = new StringBuilder();
- sql.append("CREATE TABLE
\"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");
-
- String[] columnNames = tableInfo[2].split(",");
- String[] columnTypes = tableInfo[3].split(",");
- String[] columnCategories = tableInfo[4].split(",");
- int columnSize = columnNames.length;
-
- for (int i = 0; i < columnSize; i++) {
- sql.append(columnNames[i]).append(" ");
- sql.append(columnTypes[i]).append(" ");
- sql.append(columnCategories[i]).append(",");
- }
- sql.deleteCharAt(sql.length() - 1);
- sql.append(")");
- return sql.toString();
+ return true;
+ }
+
+ private static String getCreateTableSQL(String[] tableInfo) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE \"")
+ .append(tableInfo[0])
+ .append("\".\"")
+ .append(tableInfo[1])
+ .append("\" (");
+
+ String[] columnNames = tableInfo[2].split(",");
+ String[] columnTypes = tableInfo[3].split(",");
+ String[] columnCategories = tableInfo[4].split(",");
+ int columnSize = columnNames.length;
+
+ for (int i = 0; i < columnSize; i++) {
+ sql.append(columnNames[i]).append(" ");
+ sql.append(columnTypes[i]).append(" ");
+ sql.append(columnCategories[i]).append(",");
}
-
- private void consumeInParallel() {
- ExecutorService executor =
Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM);
- for (int i = 0; i < consumerList.size(); i++) {
- RelationalConsumerThread consumerThread = new
RelationalConsumerThread(consumerList.get(i), tableSessionPool);
- executor.submit(consumerThread);
- }
+ sql.deleteCharAt(sql.length() - 1);
+ sql.append(")");
+ return sql.toString();
+ }
+
+ private void consumeInParallel() {
+ ExecutorService executor =
Executors.newFixedThreadPool(RelationalConstant.CONSUMER_THREAD_NUM);
+ for (int i = 0; i < consumerList.size(); i++) {
+ RelationalConsumerThread consumerThread =
+ new RelationalConsumerThread(consumerList.get(i), tableSessionPool);
+ executor.submit(consumerThread);
}
+ }
}
diff --git
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java
index 073de7d..3d18eb9 100644
---
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java
+++
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalConsumerThread.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.kafka.relational;
import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.pool.ITableSessionPool;
+import org.apache.iotdb.kafka.ConsumerThread;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.example.ConsumerThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,65 +37,66 @@ import java.util.List;
public class RelationalConsumerThread implements Runnable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerThread.class);
- private KafkaConsumer<String, String> consumer;
- private ITableSessionPool tableSessionPool;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerThread.class);
+ private KafkaConsumer<String, String> consumer;
+ private ITableSessionPool tableSessionPool;
- public RelationalConsumerThread(KafkaConsumer<String, String> consumer,
ITableSessionPool tableSessionPool) {
- this.consumer = consumer;
- this.tableSessionPool = tableSessionPool;
- }
+ public RelationalConsumerThread(
+ KafkaConsumer<String, String> consumer, ITableSessionPool
tableSessionPool) {
+ this.consumer = consumer;
+ this.tableSessionPool = tableSessionPool;
+ }
- @Override
- public void run() {
- try {
- do {
- ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
- LOGGER.info("Received records: {}", records.count());
- List<String> dataList = new ArrayList<>(records.count());
- for (ConsumerRecord<String, String> consumerRecord : records) {
- dataList.add(consumerRecord.value());
- }
- insertDataList(dataList);
- } while (true);
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
+ @Override
+ public void run() {
+ try {
+ do {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
+ LOGGER.info("Received records: {}", records.count());
+ List<String> dataList = new ArrayList<>(records.count());
+ for (ConsumerRecord<String, String> consumerRecord : records) {
+ dataList.add(consumerRecord.value());
}
+ insertDataList(dataList);
+ } while (true);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
}
+ }
- private void insertDataList(List<String> dataList) {
- for (String s : dataList) {
- String sql = getInsertValueSQL(s);
+ private void insertDataList(List<String> dataList) {
+ for (String s : dataList) {
+ String sql = getInsertValueSQL(s);
- try (ITableSession session = tableSessionPool.getSession()) {
- try {
- session.executeNonQueryStatement(sql);
- } catch (IoTDBConnectionException |
StatementExecutionException e) {
- LOGGER.error("Insert Values Into Table Error: ", e);
- }
- } catch (IoTDBConnectionException e) {
- LOGGER.error("Get Table Session Error: ", e);
- }
+ try (ITableSession session = tableSessionPool.getSession()) {
+ try {
+ session.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Insert Values Into Table Error: ", e);
}
+ } catch (IoTDBConnectionException e) {
+ LOGGER.error("Get Table Session Error: ", e);
+ }
}
+ }
- private String getInsertValueSQL(String s) {
- StringBuilder sql = new StringBuilder();
- String[] curDataInfo = s.split(";");
- int valueSetSize = curDataInfo.length - 3;
- String database = curDataInfo[0];
- String tableName = curDataInfo[1];
- String columnNames = curDataInfo[2];
- sql.append("INSERT INTO
\"").append(database).append("\".\"").append(tableName).append("\"(");
- sql.append(columnNames).append(") VALUES ");
+ private String getInsertValueSQL(String s) {
+ StringBuilder sql = new StringBuilder();
+ String[] curDataInfo = s.split(";");
+ int valueSetSize = curDataInfo.length - 3;
+ String database = curDataInfo[0];
+ String tableName = curDataInfo[1];
+ String columnNames = curDataInfo[2];
+ sql.append("INSERT INTO
\"").append(database).append("\".\"").append(tableName).append("\"(");
+ sql.append(columnNames).append(") VALUES ");
- for (int j = 0; j < valueSetSize; j++) {
- String columnValues = curDataInfo[3 + j];
- sql.append("(");
- sql.append(columnValues);
- sql.append("),");
- }
- sql.deleteCharAt(sql.length() - 1);
- return sql.toString();
+ for (int j = 0; j < valueSetSize; j++) {
+ String columnValues = curDataInfo[3 + j];
+ sql.append("(");
+ sql.append(columnValues);
+ sql.append("),");
}
+ sql.deleteCharAt(sql.length() - 1);
+ return sql.toString();
+ }
}
diff --git
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java
index 9bfc85d..ea5be51 100644
---
a/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java
+++
b/examples/kafka/src/main/java/org/apache/iotdb/kafka/relational/RelationalProducer.java
@@ -31,36 +31,46 @@ import java.util.Properties;
public class RelationalProducer {
- private final KafkaProducer<String, String> kafkaProducer;
- private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalProducer.class);
+ private final KafkaProducer<String, String> kafkaProducer;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalProducer.class);
- public RelationalProducer() {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
RelationalConstant.KAFKA_SERVICE_URL);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
- kafkaProducer = new KafkaProducer<>(props);
- }
+ public RelationalProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
RelationalConstant.KAFKA_SERVICE_URL);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ kafkaProducer = new KafkaProducer<>(props);
+ }
- public static void main(String[] args) {
- RelationalProducer relationalProducer = new RelationalProducer();
- relationalProducer.produce();
- relationalProducer.close();
- }
+ public static void main(String[] args) {
+ RelationalProducer relationalProducer = new RelationalProducer();
+ relationalProducer.produce();
+ relationalProducer.close();
+ }
- private void produce() {
- for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) {
- String key = Integer.toString(i);
- try {
- RecordMetadata metadata = kafkaProducer.send(new
ProducerRecord<>(RelationalConstant.TOPIC, key,
RelationalConstant.ALL_DATA[i])).get();
- LOGGER.info("Sent record(key={} value={}) meta(partition={},
offset={})\n", key, RelationalConstant.ALL_DATA[i], metadata.partition(),
metadata.offset());
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
- }
+ private void produce() {
+ for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) {
+ String key = Integer.toString(i);
+ try {
+ RecordMetadata metadata =
+ kafkaProducer
+ .send(
+ new ProducerRecord<>(
+ RelationalConstant.TOPIC, key,
RelationalConstant.ALL_DATA[i]))
+ .get();
+ LOGGER.info(
+ "Sent record(key={} value={}) meta(partition={}, offset={})\n",
+ key,
+ RelationalConstant.ALL_DATA[i],
+ metadata.partition(),
+ metadata.offset());
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
}
+ }
- private void close() {
- kafkaProducer.close();
- }
+ private void close() {
+ kafkaProducer.close();
+ }
}
diff --git a/examples/mybatis-generator/pom.xml
b/examples/mybatis-generator/pom.xml
index 67b2515..0a69db5 100644
--- a/examples/mybatis-generator/pom.xml
+++ b/examples/mybatis-generator/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
- <version>2.0.4-SNAPSHOT</version>
+ <version>${iotdb.version}</version>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
diff --git a/examples/mybatisplus-generator/README.md
b/examples/mybatisplus-generator/README.md
index 8b01b07..4314992 100644
--- a/examples/mybatisplus-generator/README.md
+++ b/examples/mybatisplus-generator/README.md
@@ -67,7 +67,7 @@ CREATE TABLE mix (
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-boot.version>3.4.5</spring-boot.version>
<spring.version>6.2.6</spring.version>
- <iotdb-jdbc.version>2.0.4-SNAPSHOT</iotdb-jdbc.version>
+ <iotdb-jdbc.version>2.0.5</iotdb-jdbc.version>
<io-springfox.version>3.0.0</io-springfox.version>
</properties>
diff --git a/examples/mybatisplus-generator/pom.xml
b/examples/mybatisplus-generator/pom.xml
index 79fa227..cf2b14c 100644
--- a/examples/mybatisplus-generator/pom.xml
+++ b/examples/mybatisplus-generator/pom.xml
@@ -38,7 +38,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-boot.version>3.5.1</spring-boot.version>
- <iotdb-jdbc.version>2.0.4-SNAPSHOT</iotdb-jdbc.version>
+ <iotdb-jdbc.version>2.0.5</iotdb-jdbc.version>
<io-springfox.version>3.0.0</io-springfox.version>
</properties>
<dependencies>
@@ -126,6 +126,11 @@
<configurationFile>src/main/resources/generatorConfig.xml</configurationFile>
</configuration>
</plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>2.43.0</version>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java
index bc4c8b9..c7dad49 100644
---
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java
+++
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQChannelUtils.java
@@ -19,9 +19,10 @@
package org.apache.iotdb.rabbitmq;
+import org.apache.iotdb.rabbitmq.relational.RelationalConstant;
+
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-import org.apache.iotdb.rabbitmq.relational.RelationalConstant;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@@ -30,8 +31,7 @@ public class RabbitMQChannelUtils {
private RabbitMQChannelUtils() {}
- public static Connection getConnection()
- throws IOException, TimeoutException {
+ public static Connection getConnection() throws IOException,
TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.SERVER_HOST);
connectionFactory.setPort(Constant.SERVER_PORT);
@@ -43,8 +43,7 @@ public class RabbitMQChannelUtils {
return connectionFactory.newConnection(Constant.CONNECTION_NAME);
}
- public static Connection getRelationalConnection()
- throws IOException, TimeoutException {
+ public static Connection getRelationalConnection() throws IOException,
TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RelationalConstant.SERVER_HOST);
connectionFactory.setPort(RelationalConstant.SERVER_PORT);
diff --git
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java
index 273fa8a..455fe66 100644
---
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java
+++
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQConsumer.java
@@ -59,7 +59,8 @@ public class RabbitMQConsumer {
createTimeseries(session, timeseries);
}
RabbitMQConsumer consumer = new RabbitMQConsumer();
- Channel channel =
RabbitMQChannelUtils.getChannelInstance(Constant.CONNECTION_NAME);
+ Channel channel = RabbitMQChannelUtils.getConnection().createChannel();
+ ;
AMQP.Queue.DeclareOk declareOk =
channel.queueDeclare(
Constant.RABBITMQ_CONSUMER_QUEUE, true, false, false, new
HashMap<>());
@@ -100,9 +101,9 @@ public class RabbitMQConsumer {
private void insert(Session session, String data)
throws IoTDBConnectionException, StatementExecutionException {
try {
- session.open();
+ session.open();
} catch (Exception e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error(e.getMessage());
}
String[] dataArray = data.split(",");
String device = dataArray[0];
diff --git
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java
index 41b66cc..cb7e0b9 100644
---
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java
+++
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/RabbitMQProducer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,13 +48,13 @@ public class RabbitMQProducer {
channel.basicPublish(
Constant.TOPIC, key, false, basicProperties,
Constant.ALL_DATA[i].getBytes());
try {
- if (channel.waitForConfirms()) {
- LOGGER.info(" [x] Sent : {}", Constant.ALL_DATA[i]);
- } else {
- LOGGER.error(" [x] Timed out waiting for confirmation");
- }
+ if (channel.waitForConfirms()) {
+ LOGGER.info(" [x] Sent : {}", Constant.ALL_DATA[i]);
+ } else {
+ LOGGER.error(" [x] Timed out waiting for confirmation");
+ }
} catch (InterruptedException e) {
- LOGGER.error(" [x] Interrupted while waiting for confirmation");
+ LOGGER.error(" [x] Interrupted while waiting for confirmation");
}
}
}
diff --git
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java
index e3ce8f4..7abcfc2 100644
---
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java
+++
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalConstant.java
@@ -21,34 +21,44 @@ package org.apache.iotdb.rabbitmq.relational;
public class RelationalConstant {
- public static final String SERVER_HOST = "localhost";
- public static final int SERVER_PORT = 5672;
- public static final String RABBITMQ_VHOST = "/";
- public static final String RABBITMQ_USERNAME = "guest";
- public static final String RABBITMQ_PASSWORD = "guest";
- public static final String CONNECTION_NAME =
"RabbitMQ-Relational-Connection";
- public static final String RABBITMQ_CONSUMER_QUEUE =
"IoTDB_Relational_Topic_Queue";
- public static final String RABBITMQ_CONSUMER_TAG =
"IoTDB_Relational_CONSUMER_TAG";
- public static final String TOPIC = "RabbitMQ-Relational-Test";
- public static final String[] IOTDB_URLS = {
- "127.0.0.1:6667"
- };
- public static final String IOTDB_USERNAME = "root";
- public static final String IOTDB_PASSWORD = "root";
- public static final String[] DATABASES = {"rabbitmq_db1", "rabbitmq_db2"};
- public static final String[][] TABLES = {
- // database, tableName, columnNames, columnTypes, columnCategories
- {"rabbitmq_db1", "tb1", "time,region,model_id,temperature,status",
"TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
- {"rabbitmq_db2", "tb2", "time,plant_id,humidity,status",
"TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
- };
- public static final String[] ALL_DATA = {
- //
database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
-
"rabbitmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
-
"rabbitmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
- "rabbitmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
- "rabbitmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
- "rabbitmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
- "rabbitmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
- "rabbitmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
- };
+ public static final String SERVER_HOST = "localhost";
+ public static final int SERVER_PORT = 5672;
+ public static final String RABBITMQ_VHOST = "/";
+ public static final String RABBITMQ_USERNAME = "guest";
+ public static final String RABBITMQ_PASSWORD = "guest";
+ public static final String CONNECTION_NAME =
"RabbitMQ-Relational-Connection";
+ public static final String RABBITMQ_CONSUMER_QUEUE =
"IoTDB_Relational_Topic_Queue";
+ public static final String RABBITMQ_CONSUMER_TAG =
"IoTDB_Relational_CONSUMER_TAG";
+ public static final String TOPIC = "RabbitMQ-Relational-Test";
+ public static final String[] IOTDB_URLS = {"127.0.0.1:6667"};
+ public static final String IOTDB_USERNAME = "root";
+ public static final String IOTDB_PASSWORD = "root";
+ public static final String[] DATABASES = {"rabbitmq_db1", "rabbitmq_db2"};
+ public static final String[][] TABLES = {
+ // database, tableName, columnNames, columnTypes, columnCategories
+ {
+ "rabbitmq_db1",
+ "tb1",
+ "time,region,model_id,temperature,status",
+ "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN",
+ "TIME,TAG,ATTRIBUTE,FIELD,FIELD"
+ },
+ {
+ "rabbitmq_db2",
+ "tb2",
+ "time,plant_id,humidity,status",
+ "TIMESTAMP,STRING,FLOAT,BOOLEAN",
+ "TIME,TAG,FIELD,FIELD"
+ }
+ };
+ public static final String[] ALL_DATA = {
+ //
database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
+
"rabbitmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
+ "rabbitmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
+ "rabbitmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
+ };
}
diff --git
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java
index 5ae0705..24fbc31 100644
---
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java
+++
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQConsumer.java
@@ -19,12 +19,18 @@
package org.apache.iotdb.rabbitmq.relational;
-import com.rabbitmq.client.*;
import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.rabbitmq.RabbitMQChannelUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.TableSessionBuilder;
-import org.example.RabbitMQChannelUtils;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,132 +41,133 @@ import java.util.concurrent.TimeoutException;
public class RelationalRabbitMQConsumer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRabbitMQConsumer.class);
- private static ITableSession tableSession;
-
- public RelationalRabbitMQConsumer() throws IoTDBConnectionException {
- initIoTDB();
- }
-
- public static void main(String[] args) throws IOException,
TimeoutException, IoTDBConnectionException {
- RelationalRabbitMQConsumer consumer = new RelationalRabbitMQConsumer();
- Connection connection = RabbitMQChannelUtils.getRelationalConnection();
- Channel channel = connection.createChannel();
- AMQP.Queue.DeclareOk declareOk =
- channel.queueDeclare(
- RelationalConstant.RABBITMQ_CONSUMER_QUEUE, true, false,
false, new HashMap<>());
- channel.exchangeDeclare(RelationalConstant.TOPIC,
BuiltinExchangeType.TOPIC);
- channel.queueBind(declareOk.getQueue(), RelationalConstant.TOPIC,
"IoTDB.#", new HashMap<>());
- DefaultConsumer defaultConsumer =
- new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(
- String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
- String param =
- consumerTag + ", " + envelope.toString() + ", " +
properties.toString();
- LOGGER.info(param);
- try {
- consumer.insert(new String(body));
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
- }
- };
- channel.basicConsume(
- declareOk.getQueue(), true,
RelationalConstant.RABBITMQ_CONSUMER_TAG, defaultConsumer);
- }
-
- private void initIoTDB() throws IoTDBConnectionException {
- tableSession =
- new TableSessionBuilder()
- .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
- .username(RelationalConstant.IOTDB_USERNAME)
- .password(RelationalConstant.IOTDB_PASSWORD)
- .build();
- for (String db : RelationalConstant.DATABASES) {
- boolean res = createDatabase(db);
- if (!res) {
- throw new RuntimeException("Create database failed");
- }
- }
- for (String[] tableInfo : RelationalConstant.TABLES) {
- boolean res = createTable(tableInfo);
- if (!res) {
- throw new RuntimeException("Create table failed");
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRabbitMQConsumer.class);
+ private static ITableSession tableSession;
+
+ public RelationalRabbitMQConsumer() throws IoTDBConnectionException {
+ initIoTDB();
+ }
+
+ public static void main(String[] args)
+ throws IOException, TimeoutException, IoTDBConnectionException {
+ RelationalRabbitMQConsumer consumer = new RelationalRabbitMQConsumer();
+ Connection connection = RabbitMQChannelUtils.getRelationalConnection();
+ Channel channel = connection.createChannel();
+ AMQP.Queue.DeclareOk declareOk =
+ channel.queueDeclare(
+ RelationalConstant.RABBITMQ_CONSUMER_QUEUE, true, false, false,
new HashMap<>());
+ channel.exchangeDeclare(RelationalConstant.TOPIC,
BuiltinExchangeType.TOPIC);
+ channel.queueBind(declareOk.getQueue(), RelationalConstant.TOPIC,
"IoTDB.#", new HashMap<>());
+ DefaultConsumer defaultConsumer =
+ new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(
+ String consumerTag, Envelope envelope, AMQP.BasicProperties
properties, byte[] body) {
+ String param = consumerTag + ", " + envelope.toString() + ", " +
properties.toString();
+ LOGGER.info(param);
+ try {
+ consumer.insert(new String(body));
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
}
- }
+ }
+ };
+ channel.basicConsume(
+ declareOk.getQueue(), true, RelationalConstant.RABBITMQ_CONSUMER_TAG,
defaultConsumer);
+ }
+
+ private void initIoTDB() throws IoTDBConnectionException {
+ tableSession =
+ new TableSessionBuilder()
+ .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
+ .username(RelationalConstant.IOTDB_USERNAME)
+ .password(RelationalConstant.IOTDB_PASSWORD)
+ .build();
+ for (String db : RelationalConstant.DATABASES) {
+ boolean res = createDatabase(db);
+ if (!res) {
+ throw new RuntimeException("Create database failed");
+ }
}
-
- private boolean createDatabase(String dbName) {
- try {
- tableSession.executeNonQueryStatement(String.format("CREATE
DATABASE %s", dbName));
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- LOGGER.error("Create Database Error: ", e);
- return false;
- }
- return true;
+ for (String[] tableInfo : RelationalConstant.TABLES) {
+ boolean res = createTable(tableInfo);
+ if (!res) {
+ throw new RuntimeException("Create table failed");
+ }
}
-
- private boolean createTable(String[] tableInfo) {
- String sql = getCreateTableSQL(tableInfo);
- try {
- tableSession.executeNonQueryStatement(sql);
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- LOGGER.error("Create Table Error: ", e);
- return false;
- }
- return true;
+ }
+
+ private boolean createDatabase(String dbName) {
+ try {
+ tableSession.executeNonQueryStatement(String.format("CREATE DATABASE
%s", dbName));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Database Error: ", e);
+ return false;
}
-
- private static String getCreateTableSQL(String[] tableInfo) {
- StringBuilder sql = new StringBuilder();
- sql.append("CREATE TABLE
\"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");
-
- String[] columnNames = tableInfo[2].split(",");
- String[] columnTypes = tableInfo[3].split(",");
- String[] columnCategories = tableInfo[4].split(",");
- int columnSize = columnNames.length;
-
- for (int i = 0; i < columnSize; i++) {
- sql.append(columnNames[i]).append(" ");
- sql.append(columnTypes[i]).append(" ");
- sql.append(columnCategories[i]).append(",");
- }
- sql.deleteCharAt(sql.length() - 1);
- sql.append(")");
- return sql.toString();
+ return true;
+ }
+
+ private boolean createTable(String[] tableInfo) {
+ String sql = getCreateTableSQL(tableInfo);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Table Error: ", e);
+ return false;
}
-
- private void insert(String data) {
- String sql = getInsertValueSQL(data);
- try {
- tableSession.executeNonQueryStatement(sql);
- LOGGER.info("Insert Success: {}", sql);
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- LOGGER.error("Insert Error: ", e);
- }
+ return true;
+ }
+
+ private static String getCreateTableSQL(String[] tableInfo) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE \"")
+ .append(tableInfo[0])
+ .append("\".\"")
+ .append(tableInfo[1])
+ .append("\" (");
+
+ String[] columnNames = tableInfo[2].split(",");
+ String[] columnTypes = tableInfo[3].split(",");
+ String[] columnCategories = tableInfo[4].split(",");
+ int columnSize = columnNames.length;
+
+ for (int i = 0; i < columnSize; i++) {
+ sql.append(columnNames[i]).append(" ");
+ sql.append(columnTypes[i]).append(" ");
+ sql.append(columnCategories[i]).append(",");
}
-
- private String getInsertValueSQL(String s) {
- StringBuilder sql = new StringBuilder();
- String[] curDataInfo = s.split(";");
- int valueSetSize = curDataInfo.length - 3;
- String database = curDataInfo[0];
- String tableName = curDataInfo[1];
- String columnNames = curDataInfo[2];
- sql.append("INSERT INTO
\"").append(database).append("\".\"").append(tableName).append("\"(");
- sql.append(columnNames).append(") VALUES ");
-
- for (int j = 0; j < valueSetSize; j++) {
- String columnValues = curDataInfo[3 + j];
- sql.append("(");
- sql.append(columnValues);
- sql.append("),");
- }
- sql.deleteCharAt(sql.length() - 1);
- return sql.toString();
+ sql.deleteCharAt(sql.length() - 1);
+ sql.append(")");
+ return sql.toString();
+ }
+
+ private void insert(String data) {
+ String sql = getInsertValueSQL(data);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ LOGGER.info("Insert Success: {}", sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Insert Error: ", e);
+ }
+ }
+
+ private String getInsertValueSQL(String s) {
+ StringBuilder sql = new StringBuilder();
+ String[] curDataInfo = s.split(";");
+ int valueSetSize = curDataInfo.length - 3;
+ String database = curDataInfo[0];
+ String tableName = curDataInfo[1];
+ String columnNames = curDataInfo[2];
+ sql.append("INSERT INTO
\"").append(database).append("\".\"").append(tableName).append("\"(");
+ sql.append(columnNames).append(") VALUES ");
+
+ for (int j = 0; j < valueSetSize; j++) {
+ String columnValues = curDataInfo[3 + j];
+ sql.append("(");
+ sql.append(columnValues);
+ sql.append("),");
}
+ sql.deleteCharAt(sql.length() - 1);
+ return sql.toString();
+ }
}
diff --git
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java
index 95f0581..d42cbbc 100644
---
a/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java
+++
b/examples/rabbitmq/src/main/java/org/apache/iotdb/rabbitmq/relational/RelationalRabbitMQProducer.java
@@ -19,8 +19,12 @@
package org.apache.iotdb.rabbitmq.relational;
-import com.rabbitmq.client.*;
-import org.example.RabbitMQChannelUtils;
+import org.apache.iotdb.rabbitmq.RabbitMQChannelUtils;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,30 +32,36 @@ import java.util.Objects;
public class RelationalRabbitMQProducer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRabbitMQProducer.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRabbitMQProducer.class);
- public static void main(String[] args) {
- try (Connection connection =
RabbitMQChannelUtils.getRelationalConnection()) {
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(RelationalConstant.TOPIC,
BuiltinExchangeType.TOPIC);
- channel.confirmSelect();
- AMQP.BasicProperties basicProperties = new
AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
- for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) {
- String key = String.format("%s.%s", "IoTDB",
Objects.toString(i));
- channel.queueDeclare(key, true, false, false, null);
- channel.basicPublish(RelationalConstant.TOPIC, key, false,
basicProperties, RelationalConstant.ALL_DATA[i].getBytes());
- try {
- if (channel.waitForConfirms()) {
- LOGGER.info(" [x] Sent : {}",
RelationalConstant.ALL_DATA[i]);
- } else {
- LOGGER.error(" [x] Timed out waiting for
confirmation");
- }
- } catch (InterruptedException e) {
- LOGGER.error(" [x] Interrupted while waiting for
confirmation");
- }
- }
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
+ public static void main(String[] args) {
+ try (Connection connection =
RabbitMQChannelUtils.getRelationalConnection()) {
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(RelationalConstant.TOPIC,
BuiltinExchangeType.TOPIC);
+ channel.confirmSelect();
+ AMQP.BasicProperties basicProperties =
+ new
AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
+ for (int i = 0; i < RelationalConstant.ALL_DATA.length; i++) {
+ String key = String.format("%s.%s", "IoTDB", Objects.toString(i));
+ channel.queueDeclare(key, true, false, false, null);
+ channel.basicPublish(
+ RelationalConstant.TOPIC,
+ key,
+ false,
+ basicProperties,
+ RelationalConstant.ALL_DATA[i].getBytes());
+ try {
+ if (channel.waitForConfirms()) {
+ LOGGER.info(" [x] Sent : {}", RelationalConstant.ALL_DATA[i]);
+ } else {
+ LOGGER.error(" [x] Timed out waiting for confirmation");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error(" [x] Interrupted while waiting for confirmation");
}
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
}
+ }
}
diff --git
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java
index e492fba..de6867f 100644
---
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java
+++
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalConstant.java
@@ -21,29 +21,39 @@ package org.apache.iotdb.rocketmq.relational;
public class RelationalConstant {
- public static final String SERVER_ADDRESS = "localhost:9876";
- public static final String PRODUCER_GROUP = "IoTDBRelationalConsumer";
- public static final String CONSUMER_GROUP = "IoTDBRelationalProducer";
- public static final String TOPIC = "RocketMQ-Relational-Test";
- public static final String[] IOTDB_URLS = {
- "127.0.0.1:6667"
- };
- public static final String IOTDB_USERNAME = "root";
- public static final String IOTDB_PASSWORD = "root";
- public static final String[] DATABASES = {"rocketmq_db1", "rocketmq_db2"};
- public static final String[][] TABLES = {
- // database, tableName, columnNames, columnTypes, columnCategories
- {"rocketmq_db1", "tb1", "time,region,model_id,temperature,status",
"TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN", "TIME,TAG,ATTRIBUTE,FIELD,FIELD"},
- {"rocketmq_db2", "tb2", "time,plant_id,humidity,status",
"TIMESTAMP,STRING,FLOAT,BOOLEAN", "TIME,TAG,FIELD,FIELD"}
- };
- public static final String[] ALL_DATA = {
- //
database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
-
"rocketmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
-
"rocketmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
- "rocketmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
- "rocketmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
- "rocketmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
- "rocketmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
- "rocketmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
- };
+ public static final String SERVER_ADDRESS = "localhost:9876";
+ public static final String PRODUCER_GROUP = "IoTDBRelationalConsumer";
+ public static final String CONSUMER_GROUP = "IoTDBRelationalProducer";
+ public static final String TOPIC = "RocketMQ-Relational-Test";
+ public static final String[] IOTDB_URLS = {"127.0.0.1:6667"};
+ public static final String IOTDB_USERNAME = "root";
+ public static final String IOTDB_PASSWORD = "root";
+ public static final String[] DATABASES = {"rocketmq_db1", "rocketmq_db2"};
+ public static final String[][] TABLES = {
+ // database, tableName, columnNames, columnTypes, columnCategories
+ {
+ "rocketmq_db1",
+ "tb1",
+ "time,region,model_id,temperature,status",
+ "TIMESTAMP,STRING,STRING,FLOAT,BOOLEAN",
+ "TIME,TAG,ATTRIBUTE,FIELD,FIELD"
+ },
+ {
+ "rocketmq_db2",
+ "tb2",
+ "time,plant_id,humidity,status",
+ "TIMESTAMP,STRING,FLOAT,BOOLEAN",
+ "TIME,TAG,FIELD,FIELD"
+ }
+ };
+ public static final String[] ALL_DATA = {
+ //
database;tableName;columnName[,columnName]*;value[,value]*[,value[:value]*]*
+
"rocketmq_db1;tb1;time,temperature,status;17,3.26,true;18,3.27,false;19,3.28,true",
+ "rocketmq_db1;tb1;time,region,model_id,temperature;20,'rgn1','id1',3.31",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;50,'id1',68.7,true",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;51,'id2',68.5,false",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;52,'id3',68.3,true",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;53,'id4',68.8,true",
+ "rocketmq_db2;tb2;time,plant_id,humidity,status;54,'id5',68.9,true"
+ };
}
diff --git
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java
index acbae72..b359581 100644
---
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java
+++
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQConsumer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.TableSessionBuilder;
+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
@@ -36,142 +37,145 @@ import java.util.Arrays;
public class RelationalRocketMQConsumer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRocketMQConsumer.class);
- private ITableSession tableSession;
- private DefaultMQPushConsumer consumer;
- private String producerGroup;
- private String serverAddresses;
-
- public RelationalRocketMQConsumer(
- String producerGroup,
- String serverAddresses) throws IoTDBConnectionException {
- this.producerGroup = producerGroup;
- this.serverAddresses = serverAddresses;
- this.consumer = new DefaultMQPushConsumer(producerGroup);
- this.consumer.setNamesrvAddr(serverAddresses);
- initIoTDB();
- }
-
- private void initIoTDB() throws IoTDBConnectionException {
- tableSession =
- new TableSessionBuilder()
- .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
- .username(RelationalConstant.IOTDB_USERNAME)
- .password(RelationalConstant.IOTDB_PASSWORD)
- .build();
- for (String db : RelationalConstant.DATABASES) {
- boolean res = createDatabase(db);
- if (!res) {
- throw new RuntimeException("Create database failed");
- }
- }
- for (String[] tableInfo : RelationalConstant.TABLES) {
- boolean res = createTable(tableInfo);
- if (!res) {
- throw new RuntimeException("Create table failed");
- }
- }
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRocketMQConsumer.class);
+ private ITableSession tableSession;
+ private DefaultMQPushConsumer consumer;
+ private String producerGroup;
+ private String serverAddresses;
+
+ public RelationalRocketMQConsumer(String producerGroup, String
serverAddresses)
+ throws IoTDBConnectionException {
+ this.producerGroup = producerGroup;
+ this.serverAddresses = serverAddresses;
+ this.consumer = new DefaultMQPushConsumer(producerGroup);
+ this.consumer.setNamesrvAddr(serverAddresses);
+ initIoTDB();
+ }
+
+ private void initIoTDB() throws IoTDBConnectionException {
+ tableSession =
+ new TableSessionBuilder()
+ .nodeUrls(Arrays.asList(RelationalConstant.IOTDB_URLS))
+ .username(RelationalConstant.IOTDB_USERNAME)
+ .password(RelationalConstant.IOTDB_PASSWORD)
+ .build();
+ for (String db : RelationalConstant.DATABASES) {
+ boolean res = createDatabase(db);
+ if (!res) {
+ throw new RuntimeException("Create database failed");
+ }
}
-
- private boolean createDatabase(String dbName) {
- try {
- tableSession.executeNonQueryStatement(String.format("CREATE
DATABASE %s", dbName));
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- LOGGER.error("Create Database Error: ", e);
- return false;
- }
- return true;
- }
-
- private boolean createTable(String[] tableInfo) {
- String sql = getCreateTableSQL(tableInfo);
- try {
- tableSession.executeNonQueryStatement(sql);
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- LOGGER.error("Create Table Error: ", e);
- return false;
- }
- return true;
+ for (String[] tableInfo : RelationalConstant.TABLES) {
+ boolean res = createTable(tableInfo);
+ if (!res) {
+ throw new RuntimeException("Create table failed");
+ }
}
-
- private static String getCreateTableSQL(String[] tableInfo) {
- StringBuilder sql = new StringBuilder();
- sql.append("CREATE TABLE
\"").append(tableInfo[0]).append("\".\"").append(tableInfo[1]).append("\" (");
-
- String[] columnNames = tableInfo[2].split(",");
- String[] columnTypes = tableInfo[3].split(",");
- String[] columnCategories = tableInfo[4].split(",");
- int columnSize = columnNames.length;
-
- for (int i = 0; i < columnSize; i++) {
- sql.append(columnNames[i]).append(" ");
- sql.append(columnTypes[i]).append(" ");
- sql.append(columnCategories[i]).append(",");
- }
- sql.deleteCharAt(sql.length() - 1);
- sql.append(")");
- return sql.toString();
+ }
+
+ private boolean createDatabase(String dbName) {
+ try {
+ tableSession.executeNonQueryStatement(String.format("CREATE DATABASE
%s", dbName));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Database Error: ", e);
+ return false;
}
-
- public static void main(String[] args)
- throws MQClientException, IoTDBConnectionException {
- RelationalRocketMQConsumer consumer =
- new RelationalRocketMQConsumer(RelationalConstant.CONSUMER_GROUP,
RelationalConstant.SERVER_ADDRESS);
- consumer.prepareConsume();
- consumer.start();
+ return true;
+ }
+
+ private boolean createTable(String[] tableInfo) {
+ String sql = getCreateTableSQL(tableInfo);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Create Table Error: ", e);
+ return false;
}
-
- public void prepareConsume() throws MQClientException {
- consumer.subscribe(RelationalConstant.TOPIC, "*");
-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.registerMessageListener(
- (MessageListenerOrderly)
- (messages, context) -> {
- for (MessageExt msg : messages) {
- LOGGER.info(
- String.format(
- "%s Receive New Messages: %s %n",
- Thread.currentThread().getName(), new
String(msg.getBody())));
- try {
- insert(new String(msg.getBody()));
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
- }
- return ConsumeOrderlyStatus.SUCCESS;
- });
+ return true;
+ }
+
+ private static String getCreateTableSQL(String[] tableInfo) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("CREATE TABLE \"")
+ .append(tableInfo[0])
+ .append("\".\"")
+ .append(tableInfo[1])
+ .append("\" (");
+
+ String[] columnNames = tableInfo[2].split(",");
+ String[] columnTypes = tableInfo[3].split(",");
+ String[] columnCategories = tableInfo[4].split(",");
+ int columnSize = columnNames.length;
+
+ for (int i = 0; i < columnSize; i++) {
+ sql.append(columnNames[i]).append(" ");
+ sql.append(columnTypes[i]).append(" ");
+ sql.append(columnCategories[i]).append(",");
}
-
- private void insert(String data) {
- String sql = getInsertValueSQL(data);
- try {
- tableSession.executeNonQueryStatement(sql);
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- LOGGER.error("Insert Values Into Table Error: ", e);
- }
+ sql.deleteCharAt(sql.length() - 1);
+ sql.append(")");
+ return sql.toString();
+ }
+
+ public static void main(String[] args) throws MQClientException,
IoTDBConnectionException {
+ RelationalRocketMQConsumer consumer =
+ new RelationalRocketMQConsumer(
+ RelationalConstant.CONSUMER_GROUP,
RelationalConstant.SERVER_ADDRESS);
+ consumer.prepareConsume();
+ consumer.start();
+ }
+
+ public void prepareConsume() throws MQClientException {
+ consumer.subscribe(RelationalConstant.TOPIC, "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ consumer.registerMessageListener(
+ (MessageListenerOrderly)
+ (messages, context) -> {
+ for (MessageExt msg : messages) {
+ LOGGER.info(
+ String.format(
+ "%s Receive New Messages: %s %n",
+ Thread.currentThread().getName(), new
String(msg.getBody())));
+ try {
+ insert(new String(msg.getBody()));
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ return ConsumeOrderlyStatus.SUCCESS;
+ });
+ }
+
+ private void insert(String data) {
+ String sql = getInsertValueSQL(data);
+ try {
+ tableSession.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ LOGGER.error("Insert Values Into Table Error: ", e);
}
-
- private String getInsertValueSQL(String s) {
- StringBuilder sql = new StringBuilder();
- String[] curDataInfo = s.split(";");
- int valueSetSize = curDataInfo.length - 3;
- String database = curDataInfo[0];
- String tableName = curDataInfo[1];
- String columnNames = curDataInfo[2];
- sql.append("INSERT INTO
\"").append(database).append("\".\"").append(tableName).append("\"(");
- sql.append(columnNames).append(") VALUES ");
-
- for (int j = 0; j < valueSetSize; j++) {
- String columnValues = curDataInfo[3 + j];
- sql.append("(");
- sql.append(columnValues);
- sql.append("),");
- }
- sql.deleteCharAt(sql.length() - 1);
- return sql.toString();
+ }
+
+ private String getInsertValueSQL(String s) {
+ StringBuilder sql = new StringBuilder();
+ String[] curDataInfo = s.split(";");
+ int valueSetSize = curDataInfo.length - 3;
+ String database = curDataInfo[0];
+ String tableName = curDataInfo[1];
+ String columnNames = curDataInfo[2];
+ sql.append("INSERT INTO
\"").append(database).append("\".\"").append(tableName).append("\"(");
+ sql.append(columnNames).append(") VALUES ");
+
+ for (int j = 0; j < valueSetSize; j++) {
+ String columnValues = curDataInfo[3 + j];
+ sql.append("(");
+ sql.append(columnValues);
+ sql.append("),");
}
+ sql.deleteCharAt(sql.length() - 1);
+ return sql.toString();
+ }
- public void start() throws MQClientException {
- consumer.start();
- }
+ public void start() throws MQClientException {
+ consumer.start();
+ }
}
diff --git
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java
index f96b5bd..d2205db 100644
---
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java
+++
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalRocketMQProducer.java
@@ -32,47 +32,53 @@ import java.nio.charset.StandardCharsets;
public class RelationalRocketMQProducer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRocketMQProducer.class);
- private DefaultMQProducer producer;
- private String producerGroup;
- private String serverAddresses;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalRocketMQProducer.class);
+ private DefaultMQProducer producer;
+ private String producerGroup;
+ private String serverAddresses;
- public RelationalRocketMQProducer(String producerGroup, String
serverAddresses) {
- this.producerGroup = producerGroup;
- this.serverAddresses = serverAddresses;
- producer = new DefaultMQProducer(producerGroup);
- producer.setNamesrvAddr(serverAddresses);
- }
+ public RelationalRocketMQProducer(String producerGroup, String
serverAddresses) {
+ this.producerGroup = producerGroup;
+ this.serverAddresses = serverAddresses;
+ producer = new DefaultMQProducer(producerGroup);
+ producer.setNamesrvAddr(serverAddresses);
+ }
- public static void main(String[] args) throws MQClientException,
MQBrokerException, RemotingException, InterruptedException {
- RelationalRocketMQProducer producer = new
RelationalRocketMQProducer(RelationalConstant.PRODUCER_GROUP,
RelationalConstant.SERVER_ADDRESS);
- producer.start();
- producer.sendMessage();
- producer.shutdown();
- }
+ public static void main(String[] args)
+ throws MQClientException, MQBrokerException, RemotingException,
InterruptedException {
+ RelationalRocketMQProducer producer =
+ new RelationalRocketMQProducer(
+ RelationalConstant.PRODUCER_GROUP,
RelationalConstant.SERVER_ADDRESS);
+ producer.start();
+ producer.sendMessage();
+ producer.shutdown();
+ }
- public void start() throws MQClientException {
- producer.start();
- }
+ public void start() throws MQClientException {
+ producer.start();
+ }
- public void sendMessage() throws MQBrokerException, RemotingException,
InterruptedException, MQClientException {
- for (String data : RelationalConstant.ALL_DATA) {
- Message msg = new Message(RelationalConstant.TOPIC, null, null,
(data).getBytes(StandardCharsets.UTF_8));
- SendResult sendResult =
- producer.send(
- msg,
- (mqs, msg1, arg) -> {
- Integer id = (Integer) arg;
- int index = id % mqs.size();
- return mqs.get(index);
- },
-
RelationalUtils.convertStringToInteger(RelationalUtils.getDatabaseNTable(data)));
- String result = sendResult.toString();
- LOGGER.info(result);
- }
+ public void sendMessage()
+ throws MQBrokerException, RemotingException, InterruptedException,
MQClientException {
+ for (String data : RelationalConstant.ALL_DATA) {
+ Message msg =
+ new Message(
+ RelationalConstant.TOPIC, null, null,
(data).getBytes(StandardCharsets.UTF_8));
+ SendResult sendResult =
+ producer.send(
+ msg,
+ (mqs, msg1, arg) -> {
+ Integer id = (Integer) arg;
+ int index = id % mqs.size();
+ return mqs.get(index);
+ },
+
RelationalUtils.convertStringToInteger(RelationalUtils.getDatabaseNTable(data)));
+ String result = sendResult.toString();
+ LOGGER.info(result);
}
+ }
- public void shutdown() {
- producer.shutdown();
- }
+ public void shutdown() {
+ producer.shutdown();
+ }
}
diff --git
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java
index 2bbd3c0..ad19827 100644
---
a/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java
+++
b/examples/rocketmq/src/main/java/org/apache/iotdb/rocketmq/relational/RelationalUtils.java
@@ -21,20 +21,20 @@ package org.apache.iotdb.rocketmq.relational;
public class RelationalUtils {
- private RelationalUtils() {
- throw new IllegalStateException("Utility class");
- }
+ private RelationalUtils() {
+ throw new IllegalStateException("Utility class");
+ }
- public static int convertStringToInteger(String fullTable) {
- int sum = 0;
- for (char c : fullTable.toCharArray()) {
- sum += c;
- }
- return sum;
+ public static int convertStringToInteger(String fullTable) {
+ int sum = 0;
+ for (char c : fullTable.toCharArray()) {
+ sum += c;
}
+ return sum;
+ }
- public static String getDatabaseNTable(String data) {
- String[] info = data.split(";");
- return info[0] + "." + info[1];
- }
+ public static String getDatabaseNTable(String data) {
+ String[] info = data.split(";");
+ return info[0] + "." + info[1];
+ }
}
diff --git a/iotdb-collector/collector-core/pom.xml
b/iotdb-collector/collector-core/pom.xml
index 6182611..205fbec 100644
--- a/iotdb-collector/collector-core/pom.xml
+++ b/iotdb-collector/collector-core/pom.xml
@@ -185,6 +185,7 @@
<usedDependency>org.eclipse.jetty:jetty-util</usedDependency>
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
<usedDependency>org.xerial:sqlite-jdbc</usedDependency>
+
<usedDependency>org.apache.tsfile:common</usedDependency>
</usedDependencies>
</configuration>
</plugin>
diff --git a/iotdb-spring-boot-starter/pom.xml
b/iotdb-spring-boot-starter/pom.xml
index 64a5292..83a75de 100644
--- a/iotdb-spring-boot-starter/pom.xml
+++ b/iotdb-spring-boot-starter/pom.xml
@@ -36,7 +36,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-boot.version>3.5.1</spring-boot.version>
<spring.version>6.2.7</spring.version>
- <iotdb-version>2.0.4-SNAPSHOT</iotdb-version>
+ <iotdb-version>2.0.5</iotdb-version>
</properties>
<dependencyManagement>
<dependencies>
diff --git a/pom.xml b/pom.xml
index 9ce4d43..ca88f39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,7 +183,7 @@
<thrift.version>0.14.1</thrift.version>
<!-- This was the last version to support Java 8 -->
<tomcat.version>9.0.86</tomcat.version>
- <tsfile.version>2.0.2</tsfile.version>
+ <tsfile.version>2.1.1</tsfile.version>
<xz.version>1.9</xz.version>
<zeppelin.version>0.11.1</zeppelin.version>
<zstd-jni.version>1.5.5-5</zstd-jni.version>