This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xianyi by this push:
new 9ccbfeb dataGenerator (#4468)
9ccbfeb is described below
commit 9ccbfebc7deb3b0bd13482635239e584b7251f2e
Author: Mrquan <[email protected]>
AuthorDate: Thu Nov 25 22:08:34 2021 +0800
dataGenerator (#4468)
* ClusterSession and dataGenerator
* ClusterSession and dataGenerator
Co-authored-by: 权思屹 <[email protected]>
---
generator/pom.xml | 111 ++++++++++++++++++++
.../java/org/apache/iotdb/generator/Generator.java | 47 +++++++++
.../apache/iotdb/generator/GeneratorEntrance.java | 53 ++++++++++
pom.xml | 1 +
.../org/apache/iotdb/session/ClusterSession.java | 112 +++++++++++++++++++++
.../java/org/apache/iotdb/session/Session.java | 11 +-
.../apache/iotdb/session/SessionConnection.java | 2 +-
7 files changed, 335 insertions(+), 2 deletions(-)
diff --git a/generator/pom.xml b/generator/pom.xml
new file mode 100644
index 0000000..8916a19
--- /dev/null
+++ b/generator/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-parent</artifactId>
+ <version>0.13.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>generator</artifactId>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+ <dependencies>
+ <!-- The version of thrift is overridden because using 0.13.0 in the
cluster module
+ will cause unclear bugs -->
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.14.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>service-rpc</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift-cluster</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>4.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- for mocked test-->
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/generator/src/main/java/org/apache/iotdb/generator/Generator.java
b/generator/src/main/java/org/apache/iotdb/generator/Generator.java
new file mode 100644
index 0000000..d247978
--- /dev/null
+++ b/generator/src/main/java/org/apache/iotdb/generator/Generator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.generator;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Generator {
+
+ public static Tablet generateTablet(
+ String deviceId, String measurementId, long startTime, int pointNum) {
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new UnaryMeasurementSchema(measurementId,
TSDataType.INT32));
+ Tablet tablet = new Tablet(deviceId, schemaList, pointNum);
+
+ long timestamp = startTime;
+ for (long row = 0; row < pointNum; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(measurementId, rowIndex, 0);
+ timestamp += 1;
+ }
+ return tablet;
+ }
+}
diff --git
a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
new file mode 100644
index 0000000..d97ad7f
--- /dev/null
+++ b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
@@ -0,0 +1,53 @@
+package org.apache.iotdb.generator;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.ClusterSession;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GeneratorEntrance {
+ private static final Logger logger =
LoggerFactory.getLogger(GeneratorEntrance.class);
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException,
InterruptedException {
+ args = new String[6];
+ args[1] = "127.0.0.1:6667";
+ args[2] = "root.sg1.d1.s1";
+ args[3] = "3";
+ args[4] = "1000";
+ String[] addressElements = args[1].split(":");
+ String seriesPath = args[2];
+ int timeInterval = Integer.parseInt(args[3]) * 1000;
+ int batchNum = Integer.parseInt(args[4]);
+ String[] pathElements = seriesPath.split("\\.");
+ String measurementId = pathElements[pathElements.length - 1];
+ String deviceId = seriesPath.substring(0, seriesPath.length() -
measurementId.length() - 1);
+
+ ClusterSession clusterSession =
+ new ClusterSession(addressElements[0],
Integer.parseInt(addressElements[1]));
+
+ long timestampForInsert = 0;
+ while (true) {
+ long startTime = System.currentTimeMillis();
+ Tablet tablet =
+ Generator.generateTablet(
+ deviceId, pathElements[pathElements.length - 1],
timestampForInsert, batchNum);
+ clusterSession.insertTablet(tablet);
+ logger.info("Insert {} data points to {}", batchNum, seriesPath);
+ String query = String.format("select count(%s) from %s", measurementId,
deviceId);
+ SessionDataSet sessionDataSet =
+ clusterSession.queryTablet(
+ query, deviceId);
+ logger.info("Execute query {} with result :
{}",query,sessionDataSet.next().getFields().get(0));
+ timestampForInsert += batchNum;
+ long endTime = System.currentTimeMillis();
+ if (timeInterval - (endTime - startTime)>0) {
+ Thread.sleep(timeInterval - (endTime - startTime));
+ }
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 6286eef..5093ce1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
<module>openapi</module>
<module>server</module>
<module>example</module>
+ <module>generator</module>
<module>grafana</module>
<module>spark-tsfile</module>
<module>hadoop</module>
diff --git a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
new file mode 100644
index 0000000..d7617ca
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@ -0,0 +1,112 @@
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterSession {
+ Session[] sessions;
+ ArrayBlockingQueue<Tablet>[] queues;
+ List<EndPoint> nodeList;
+
+ public ClusterSession(String host, int rpcPort) throws
IoTDBConnectionException {
+ Session session = new Session(host, rpcPort);
+ session.open();
+ nodeList = new ArrayList<>();
+ nodeList.addAll(session.getNodeList());
+ sessions = new Session[nodeList.size()];
+ queues = new ArrayBlockingQueue[nodeList.size()];
+ for (int i = 0; i < nodeList.size(); i++) {
+ sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
+ sessions[i].open();
+ queues[i] = new ArrayBlockingQueue<Tablet>(1000);
+ new Thread(new RunnableTask(i)).start();
+ }
+ }
+
+ public void insertTablet(Tablet tablet)
+ throws StatementExecutionException, IoTDBConnectionException {
+ int hashVal = tablet.prefixPath.hashCode();
+ int index = hashVal % nodeList.size();
+ for (int i = 0; i < 2; i++) {
+ int j = (index + i) % nodeList.size();
+ synchronized (queues[j]) {
+ if (!queues[j].isEmpty()) {
+ queues[j].add(tablet);
+ queues[j].notifyAll();
+ continue;
+ }
+ }
+ try {
+ sessions[j].insertTablet(tablet);
+ } catch (Exception e) {
+ synchronized (queues[j]) {
+ queues[j].add(tablet);
+ queues[j].notifyAll();
+ }
+ }
+ }
+ }
+
+ public SessionDataSet queryTablet(String sql, String deviceId) {
+ int hashVal = deviceId.hashCode();
+ int index = hashVal % nodeList.size();
+ SessionDataSet sessionDataSet = null;
+ try {
+ sessionDataSet = sessions[index].executeQueryStatement(sql);
+ } catch (Exception e) {
+ try {
+ sessionDataSet = sessions[(index + 1) %
nodeList.size()].executeQueryStatement(sql);
+ } catch (Exception ex) {
+ // never happen, once the node restart, it won't be killed anymore.
+ e.printStackTrace();
+ }
+ }
+ return sessionDataSet;
+ }
+
+ public Session reconnect(int index) throws IoTDBConnectionException {
+ sessions[index] = new Session(nodeList.get(index).ip,
nodeList.get(index).port);
+ sessions[index].open();
+ return sessions[index];
+ }
+
+ class RunnableTask implements Runnable {
+ int index;
+
+ public RunnableTask(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public void run() {
+ Tablet tablet;
+ while (true) {
+ Tablet t;
+ synchronized (queues[index]) {
+ if (queues[index].isEmpty()) {
+ try {
+ queues[index].wait(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ try {
+ Session session = reconnect(index);
+ t = queues[index].poll();
+ session.insertTablet(t);
+ } catch (StatementExecutionException | IoTDBConnectionException
e) {
+ }
+ }
+ }
+
+ }
+
+ }
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index fca13f4..2d854d5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -129,6 +129,7 @@ public class Session {
protected volatile Map<EndPoint, SessionConnection>
endPointToSessionConnection;
protected boolean enableQueryRedirection = false;
+ protected List<EndPoint> nodeList;
public Session(String host, int rpcPort) {
this(
@@ -758,7 +759,7 @@ public class Session {
}
}
- private SessionConnection getSessionConnection(String deviceId) {
+ protected SessionConnection getSessionConnection(String deviceId) {
EndPoint endPoint;
if (enableCacheLeader
&& !deviceIdToEndpoint.isEmpty()
@@ -2211,6 +2212,14 @@ public class Session {
defaultSessionConnection.unsetSchemaTemplate(request);
}
+ public List<EndPoint> getNodeList() {
+ return nodeList;
+ }
+
+ public void setNodeList(List<EndPoint> nodeList) {
+ this.nodeList = nodeList;
+ }
+
private TSSetSchemaTemplateReq getTSSetSchemaTemplateReq(String
templateName, String prefixPath) {
TSSetSchemaTemplateReq request = new TSSetSchemaTemplateReq();
request.setTemplateName(templateName);
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index de6350e..6ef035e 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -129,7 +129,7 @@ public class SessionConnection {
try {
TSOpenSessionResp openResp = client.openSession(openReq);
-
+ session.setNodeList(openResp.nodeList);
RpcUtils.verifySuccess(openResp.getStatus());
if (Session.protocolVersion.getValue() !=
openResp.getServerProtocolVersion().getValue()) {