This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch xianyi in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7b816c0b5903812c6debbb21f2e48b150a6a290b Merge: 4fd32d6 9ccbfeb Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Nov 25 22:11:09 2021 +0800 fix conflicts .../java/org/apache/iotdb/cli/AbstractCli.java | 69 +++++++++++++ cli/src/main/java/org/apache/iotdb/cli/Cli.java | 1 + generator/pom.xml | 111 +++++++++++++++++++++ .../java/org/apache/iotdb/generator/Generator.java | 47 +++++++++ .../apache/iotdb/generator/GeneratorEntrance.java | 52 ++++++++++ .../org/apache/iotdb/jdbc/IoTDBConnection.java | 11 ++ pom.xml | 1 + .../resources/conf/iotdb-engine.properties | 3 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 + .../org/apache/iotdb/db/service/TSServiceImpl.java | 13 ++- .../org/apache/iotdb/session/ClusterSession.java | 109 ++++++++++++++++++++ .../java/org/apache/iotdb/session/Session.java | 11 +- .../apache/iotdb/session/SessionConnection.java | 2 +- thrift/src/main/thrift/rpc.thrift | 2 + 15 files changed, 441 insertions(+), 3 deletions(-) diff --cc generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java index 0000000,d97ad7f..b26006e mode 000000,100644..100644 --- a/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java +++ b/generator/src/main/java/org/apache/iotdb/generator/GeneratorEntrance.java @@@ -1,0 -1,53 +1,52 @@@ + package org.apache.iotdb.generator; + + import org.apache.iotdb.rpc.IoTDBConnectionException; + import org.apache.iotdb.rpc.StatementExecutionException; + import org.apache.iotdb.session.ClusterSession; + import org.apache.iotdb.session.SessionDataSet; + import org.apache.iotdb.tsfile.write.record.Tablet; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class GeneratorEntrance { + private static final Logger logger = LoggerFactory.getLogger(GeneratorEntrance.class); + + public static void main(String[] args) + throws IoTDBConnectionException, StatementExecutionException, InterruptedException { + args = new String[6]; + args[1] = "127.0.0.1:6667"; + args[2] = "root.sg1.d1.s1"; + args[3] = "3"; + args[4] = "1000"; + String[] addressElements = args[1].split(":"); + String seriesPath = args[2]; + int timeInterval = Integer.parseInt(args[3]) * 1000; + int batchNum = Integer.parseInt(args[4]); + String[] pathElements = seriesPath.split("\\."); + String measurementId = pathElements[pathElements.length - 1]; + String deviceId = seriesPath.substring(0, seriesPath.length() - measurementId.length() - 1); + + ClusterSession clusterSession = + new ClusterSession(addressElements[0], Integer.parseInt(addressElements[1])); + + long timestampForInsert = 0; + while (true) { + long startTime = System.currentTimeMillis(); + Tablet tablet = + Generator.generateTablet( + deviceId, pathElements[pathElements.length - 1], timestampForInsert, batchNum); + clusterSession.insertTablet(tablet); + logger.info("Insert {} data points to {}", batchNum, seriesPath); - String query = String.format("select count(%s) from %s", measurementId, deviceId); - SessionDataSet sessionDataSet = - clusterSession.queryTablet( - query, deviceId); - logger.info("Execute query {} with result : {}",query,sessionDataSet.next().getFields().get(0)); ++ String query = String.format("select count(%s) from %s", measurementId, deviceId); ++ SessionDataSet sessionDataSet = clusterSession.queryTablet(query, deviceId); ++ logger.info( ++ "Execute query {} with result : {}", query, sessionDataSet.next().getFields().get(0)); + timestampForInsert += batchNum; + long endTime = System.currentTimeMillis(); - if (timeInterval - (endTime - startTime)>0) { ++ if (timeInterval - (endTime - startTime) > 0) { + Thread.sleep(timeInterval - (endTime - startTime)); + } + } + } + } diff --cc session/src/main/java/org/apache/iotdb/session/ClusterSession.java index 0000000,d7617ca..a6e0fad mode 000000,100644..100644 --- a/session/src/main/java/org/apache/iotdb/session/ClusterSession.java +++ b/session/src/main/java/org/apache/iotdb/session/ClusterSession.java @@@ -1,0 -1,112 +1,109 @@@ + package org.apache.iotdb.session; + + import org.apache.iotdb.rpc.IoTDBConnectionException; + import org.apache.iotdb.rpc.StatementExecutionException; + import org.apache.iotdb.service.rpc.thrift.EndPoint; + import org.apache.iotdb.tsfile.write.record.Tablet; + + import java.util.*; + import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeUnit; + + public class ClusterSession { + Session[] sessions; + ArrayBlockingQueue<Tablet>[] queues; + List<EndPoint> nodeList; + + public ClusterSession(String host, int rpcPort) throws IoTDBConnectionException { + Session session = new Session(host, rpcPort); + session.open(); + nodeList = new ArrayList<>(); + nodeList.addAll(session.getNodeList()); + sessions = new Session[nodeList.size()]; + queues = new ArrayBlockingQueue[nodeList.size()]; + for (int i = 0; i < nodeList.size(); i++) { + sessions[i] = new Session(nodeList.get(i).ip, nodeList.get(i).port); + sessions[i].open(); + queues[i] = new ArrayBlockingQueue<Tablet>(1000); + new Thread(new RunnableTask(i)).start(); + } + } + + public void insertTablet(Tablet tablet) + throws StatementExecutionException, IoTDBConnectionException { + int hashVal = tablet.prefixPath.hashCode(); + int index = hashVal % nodeList.size(); + for (int i = 0; i < 2; i++) { + int j = (index + i) % nodeList.size(); + synchronized (queues[j]) { + if (!queues[j].isEmpty()) { + queues[j].add(tablet); + queues[j].notifyAll(); + continue; + } + } + try { + sessions[j].insertTablet(tablet); + } catch (Exception e) { + synchronized (queues[j]) { + queues[j].add(tablet); + queues[j].notifyAll(); + } + } + } + } + + public SessionDataSet queryTablet(String sql, String deviceId) { + int hashVal = deviceId.hashCode(); + int index = hashVal % nodeList.size(); + SessionDataSet sessionDataSet = null; + try { + sessionDataSet = sessions[index].executeQueryStatement(sql); + } catch (Exception e) { + try { + sessionDataSet = sessions[(index + 1) % nodeList.size()].executeQueryStatement(sql); + } catch (Exception ex) { + // never happen, once the node restart, it won't be killed anymore. + e.printStackTrace(); + } + } + return sessionDataSet; + } + + public Session reconnect(int index) throws IoTDBConnectionException { + sessions[index] = new Session(nodeList.get(index).ip, nodeList.get(index).port); + sessions[index].open(); + return sessions[index]; + } + + class RunnableTask implements Runnable { + int index; + + public RunnableTask(int index) { + this.index = index; + } + + @Override + public void run() { - Tablet tablet; - while (true) { - Tablet t; - synchronized (queues[index]) { - if (queues[index].isEmpty()) { - try { - queues[index].wait(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } else { - try { - Session session = reconnect(index); - t = queues[index].poll(); - session.insertTablet(t); - } catch (StatementExecutionException | IoTDBConnectionException e) { - } ++ Tablet tablet; ++ while (true) { ++ Tablet t; ++ synchronized (queues[index]) { ++ if (queues[index].isEmpty()) { ++ try { ++ queues[index].wait(1000); ++ } catch (InterruptedException e) { ++ e.printStackTrace(); ++ } ++ } else { ++ try { ++ Session session = reconnect(index); ++ t = queues[index].poll(); ++ session.insertTablet(t); ++ } catch (StatementExecutionException | IoTDBConnectionException e) { + } + } - + } - ++ } + } + } + }
