This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch xianyi
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xianyi by this push:
     new 9ccbfeb  dataGenerator (#4468)
9ccbfeb is described below

commit 9ccbfebc7deb3b0bd13482635239e584b7251f2e
Author: Mrquan <[email protected]>
AuthorDate: Thu Nov 25 22:08:34 2021 +0800

    dataGenerator (#4468)
    
    * ClusterSession and dataGenerator
    
    * ClusterSession and dataGenerator
    
    Co-authored-by: 权思屹 <[email protected]>
---
 generator/pom.xml                                  | 111 ++++++++++++++++++++
 .../java/org/apache/iotdb/generator/Generator.java |  47 +++++++++
 .../apache/iotdb/generator/GeneratorEntrance.java  |  53 ++++++++++
 pom.xml                                            |   1 +
 .../org/apache/iotdb/session/ClusterSession.java   | 112 +++++++++++++++++++++
 .../java/org/apache/iotdb/session/Session.java     |  11 +-
 .../apache/iotdb/session/SessionConnection.java    |   2 +-
 7 files changed, 335 insertions(+), 2 deletions(-)

diff --git a/generator/pom.xml b/generator/pom.xml
new file mode 100644
index 0000000..8916a19
--- /dev/null
+++ b/generator/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-parent</artifactId>
+        <version>0.13.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>generator</artifactId>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+    <dependencies>
+        <!-- The version of thrift is overridden because using 0.13.0 in the 
cluster module
+        will cause unclear bugs -->
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.14.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>service-rpc</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-thrift-cluster</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-jdbc</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>4.0.2</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- for mocked test-->
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/generator/src/main/java/org/apache/iotdb/generator/Generator.java 
b/generator/src/main/java/org/apache/iotdb/generator/Generator.java
new file mode 100644
index 0000000..d247978
--- /dev/null
+++ b/generator/src/main/java/org/apache/iotdb/generator/Generator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.generator;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Generator {
+
+  public static Tablet generateTablet(
+      String deviceId, String measurementId, long startTime, int pointNum) {
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new UnaryMeasurementSchema(measurementId, 
TSDataType.INT32));
+    Tablet tablet = new Tablet(deviceId, schemaList, pointNum);
+
+    long timestamp = startTime;
+    for (long row = 0; row < pointNum; row++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      tablet.addValue(measurementId, rowIndex, 0);
+      timestamp += 1;
+    }
+    return tablet;
+  }
+}
diff --git 
a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java 
b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
new file mode 100644
index 0000000..d97ad7f
--- /dev/null
+++ b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java
@@ -0,0 +1,53 @@
+package org.apache.iotdb.generator;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.ClusterSession;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GeneratorEntrance {
+  private static final Logger logger = 
LoggerFactory.getLogger(GeneratorEntrance.class);
+
+  public static void main(String[] args)
+      throws IoTDBConnectionException, StatementExecutionException, 
InterruptedException {
+    args = new String[6];
+    args[1] = "127.0.0.1:6667";
+    args[2] = "root.sg1.d1.s1";
+    args[3] = "3";
+    args[4] = "1000";
+    String[] addressElements = args[1].split(":");
+    String seriesPath = args[2];
+    int timeInterval = Integer.parseInt(args[3]) * 1000;
+    int batchNum = Integer.parseInt(args[4]);
+    String[] pathElements = seriesPath.split("\\.");
+    String measurementId = pathElements[pathElements.length - 1];
+    String deviceId = seriesPath.substring(0, seriesPath.length() - 
measurementId.length() - 1);
+
+    ClusterSession clusterSession =
+        new ClusterSession(addressElements[0], 
Integer.parseInt(addressElements[1]));
+
+    long timestampForInsert = 0;
+    while (true) {
+      long startTime = System.currentTimeMillis();
+      Tablet tablet =
+          Generator.generateTablet(
+              deviceId, pathElements[pathElements.length - 1], 
timestampForInsert, batchNum);
+      clusterSession.insertTablet(tablet);
+      logger.info("Insert {} data points to {}", batchNum, seriesPath);
+      String query =  String.format("select count(%s) from %s", measurementId, 
deviceId);
+      SessionDataSet sessionDataSet =
+          clusterSession.queryTablet(
+                  query, deviceId);
+      logger.info("Execute query {} with result : 
{}",query,sessionDataSet.next().getFields().get(0));
+      timestampForInsert += batchNum;
+      long endTime = System.currentTimeMillis();
+      if (timeInterval - (endTime - startTime)>0) {
+        Thread.sleep(timeInterval - (endTime - startTime));
+      }
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 6286eef..5093ce1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
         <module>openapi</module>
         <module>server</module>
         <module>example</module>
+        <module>generator</module>
         <module>grafana</module>
         <module>spark-tsfile</module>
         <module>hadoop</module>
diff --git a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java 
b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
new file mode 100644
index 0000000..d7617ca
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java
@@ -0,0 +1,112 @@
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterSession {
+  Session[] sessions;
+  ArrayBlockingQueue<Tablet>[] queues;
+  List<EndPoint> nodeList;
+
+  public ClusterSession(String host, int rpcPort) throws 
IoTDBConnectionException {
+    Session session = new Session(host, rpcPort);
+    session.open();
+    nodeList = new ArrayList<>();
+    nodeList.addAll(session.getNodeList());
+    sessions = new Session[nodeList.size()];
+    queues = new ArrayBlockingQueue[nodeList.size()];
+    for (int i = 0; i < nodeList.size(); i++) {
+      sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port);
+      sessions[i].open();
+      queues[i] = new ArrayBlockingQueue<Tablet>(1000);
+      new Thread(new RunnableTask(i)).start();
+    }
+  }
+
+  public void insertTablet(Tablet tablet)
+      throws StatementExecutionException, IoTDBConnectionException {
+    int hashVal = tablet.prefixPath.hashCode();
+    int index = hashVal % nodeList.size();
+    for (int i = 0; i < 2; i++) {
+      int j = (index + i) % nodeList.size();
+      synchronized (queues[j]) {
+        if (!queues[j].isEmpty()) {
+          queues[j].add(tablet);
+          queues[j].notifyAll();
+          continue;
+        }
+      }
+      try {
+        sessions[j].insertTablet(tablet);
+      } catch (Exception e) {
+        synchronized (queues[j]) {
+          queues[j].add(tablet);
+          queues[j].notifyAll();
+        }
+      }
+    }
+  }
+
+  public SessionDataSet queryTablet(String sql, String deviceId) {
+    int hashVal = deviceId.hashCode();
+    int index = hashVal % nodeList.size();
+    SessionDataSet sessionDataSet = null;
+    try {
+      sessionDataSet = sessions[index].executeQueryStatement(sql);
+    } catch (Exception e) {
+      try {
+        sessionDataSet = sessions[(index + 1) % 
nodeList.size()].executeQueryStatement(sql);
+      } catch (Exception ex) {
+        // never happen, once the node restart, it won't be killed anymore.
+        e.printStackTrace();
+      }
+    }
+    return sessionDataSet;
+  }
+
+  public Session reconnect(int index) throws IoTDBConnectionException {
+    sessions[index] = new Session(nodeList.get(index).ip, 
nodeList.get(index).port);
+    sessions[index].open();
+    return sessions[index];
+  }
+
+  class RunnableTask implements Runnable {
+    int index;
+
+    public RunnableTask(int index) {
+      this.index = index;
+    }
+
+    @Override
+    public void run() {
+        Tablet tablet;
+        while (true) {
+          Tablet t;
+          synchronized (queues[index]) {
+            if (queues[index].isEmpty()) {
+              try {
+                queues[index].wait(1000);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+              }
+            } else {
+              try {
+                Session session = reconnect(index);
+                t = queues[index].poll();
+                session.insertTablet(t);
+              } catch (StatementExecutionException | IoTDBConnectionException 
e) {
+              }
+            }
+          }
+
+        }
+
+    }
+  }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index fca13f4..2d854d5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -129,6 +129,7 @@ public class Session {
   protected volatile Map<EndPoint, SessionConnection> 
endPointToSessionConnection;
 
   protected boolean enableQueryRedirection = false;
+  protected List<EndPoint> nodeList;
 
   public Session(String host, int rpcPort) {
     this(
@@ -758,7 +759,7 @@ public class Session {
     }
   }
 
-  private SessionConnection getSessionConnection(String deviceId) {
+  protected SessionConnection getSessionConnection(String deviceId) {
     EndPoint endPoint;
     if (enableCacheLeader
         && !deviceIdToEndpoint.isEmpty()
@@ -2211,6 +2212,14 @@ public class Session {
     defaultSessionConnection.unsetSchemaTemplate(request);
   }
 
+  public List<EndPoint> getNodeList() {
+    return nodeList;
+  }
+
+  public void setNodeList(List<EndPoint> nodeList) {
+    this.nodeList = nodeList;
+  }
+
   private TSSetSchemaTemplateReq getTSSetSchemaTemplateReq(String 
templateName, String prefixPath) {
     TSSetSchemaTemplateReq request = new TSSetSchemaTemplateReq();
     request.setTemplateName(templateName);
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index de6350e..6ef035e 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -129,7 +129,7 @@ public class SessionConnection {
 
     try {
       TSOpenSessionResp openResp = client.openSession(openReq);
-
+      session.setNodeList(openResp.nodeList);
       RpcUtils.verifySuccess(openResp.getStatus());
 
       if (Session.protocolVersion.getValue() != 
openResp.getServerProtocolVersion().getValue()) {

Reply via email to