This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_0423 by this 
push:
     new ab0f0a0b22 add interface of session (#9677)
ab0f0a0b22 is described below

commit ab0f0a0b22dde67d8a23adc751ce993d9a11f1f4
Author: Zhijia Cao <[email protected]>
AuthorDate: Sun Apr 23 19:51:24 2023 +0800

    add interface of session (#9677)
---
 .../java/org/apache/iotdb/FastInsertExample.java   | 93 ++++++++++++++++++++++
 .../db/mpp/plan/parser/StatementGenerator.java     |  3 -
 .../planner/plan/node/write/FastInsertRowNode.java | 12 +--
 .../plan/node/write/FastInsertRowsNode.java        |  4 +-
 .../statement/crud/FastInsertRowStatement.java     |  4 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 20 ++---
 .../java/org/apache/iotdb/session/Session.java     | 81 ++++++++++++++++++-
 .../apache/iotdb/session/SessionConnection.java    | 21 +++++
 .../apache/iotdb/session/util/SessionUtils.java    | 38 ++++++++-
 9 files changed, 244 insertions(+), 32 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java 
b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
new file mode 100644
index 0000000000..5e2a3a9efc
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings("squid:S106")
+public class FastInsertExample {
+
+  private static Session session;
+  private static Session sessionEnableRedirect;
+  private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
+  private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
+  private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
+  private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
+  private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
+  private static final String ROOT_SG1_D1 = "root.sg1.d1";
+  private static final String LOCAL_HOST = "127.0.0.1";
+
+  public static void main(String[] args)
+      throws IoTDBConnectionException, StatementExecutionException {
+    session =
+        new Session.Builder()
+            .host(LOCAL_HOST)
+            .port(6667)
+            .username("root")
+            .password("root")
+            .version(Version.V_1_0)
+            .build();
+    session.open(false);
+
+    fastInsertRecords();
+    session.close();
+  }
+
+  private static void fastInsertRecords()
+      throws IoTDBConnectionException, StatementExecutionException {
+    String deviceId = ROOT_SG1_D1;
+    List<String> deviceIds = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+
+      deviceIds.add(deviceId);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        session.fastInsertRecords(deviceIds, timestamps, typesList, 
valuesList);
+        deviceIds.clear();
+        valuesList.clear();
+        typesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index a8982d373c..a5d9e25c96 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -394,9 +394,6 @@ public class StatementGenerator {
       statement.setTime(req.getTimestamps().get(i));
       statement.setValues(req.valuesList.get(i));
       // skip empty statement
-      if (statement.isEmpty()) {
-        continue;
-      }
       insertRowStatementList.add(statement);
     }
     insertStatement.setInsertRowStatementList(insertRowStatementList);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index a04f833bac..0667f84028 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -19,26 +19,20 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
-import java.nio.ByteBuffer;
-import java.util.List;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.nio.ByteBuffer;
 
 public class FastInsertRowNode extends InsertRowNode {
 
   private ByteBuffer rawValues;
 
-
   public FastInsertRowNode(PlanNodeId id) {
     super(id);
   }
 
-  public FastInsertRowNode(
-      PlanNodeId id,
-      PartialPath devicePath,
-      long time,
-      ByteBuffer values) {
+  public FastInsertRowNode(PlanNodeId id, PartialPath devicePath, long time, 
ByteBuffer values) {
     super(id, devicePath, true, null, null, time, null, false);
     this.rawValues = values;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index 9cbc5ba349..e47fd9434b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -29,7 +29,9 @@ public class FastInsertRowsNode extends InsertRowsNode {
   }
 
   public FastInsertRowsNode(
-      PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> 
fastInsertRowNodeList) {
+      PlanNodeId id,
+      List<Integer> insertRowNodeIndexList,
+      List<InsertRowNode> fastInsertRowNodeList) {
     super(id, insertRowNodeIndexList, fastInsertRowNodeList);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
index b2a93fe502..0ece663002 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import java.nio.ByteBuffer;
 
-public class FastInsertRowStatement extends InsertRowStatement{
+public class FastInsertRowStatement extends InsertRowStatement {
 
   private ByteBuffer rawValues;
 
@@ -33,5 +32,4 @@ public class FastInsertRowStatement extends 
InsertRowStatement{
   public ByteBuffer getRawValues() {
     return rawValues;
   }
-
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index a18ccaf280..f515f729b7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1216,8 +1216,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       }
 
       // check whether measurement is legal according to syntax convention
-//      req.setMeasurementsList(
-//          
PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
+      //      req.setMeasurementsList(
+      //          
PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
 
       // Step 1:  transfer from TSInsertRecordsReq to Statement
       InsertRowsStatement statement = StatementGenerator.createStatement(req);
@@ -1237,14 +1237,14 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // permission check
       // CANNOT checkAuthority
-//      TSStatus status = AuthorityChecker.checkAuthority(statement, 
clientSession);
-//      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-//        return status;
-//      }
-//
-//      quota =
-//          DataNodeThrottleQuotaManager.getInstance()
-//              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      //      TSStatus status = AuthorityChecker.checkAuthority(statement, 
clientSession);
+      //      if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      //        return status;
+      //      }
+      //
+      //      quota =
+      //          DataNodeThrottleQuotaManager.getInstance()
+      //              
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement);
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index c73a9c7c55..e39b23629f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -1746,14 +1747,36 @@ public class Session implements ISession {
     request.addToValuesList(values);
   }
 
-  // TODO: (FASTWRITE) (曹志杰) 实现该接口
   @Override
   public void fastInsertRecords(
       List<String> deviceIds,
       List<Long> times,
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
-      throws IoTDBConnectionException, StatementExecutionException {}
+      throws IoTDBConnectionException, StatementExecutionException {
+    int len = deviceIds.size();
+    if (len != times.size() || len != valuesList.size()) {
+      throw new IllegalArgumentException("deviceIds, times and valuesList's 
size should be equal");
+    }
+    if (enableRedirection) {
+      fastInsertRecordsWithLeaderCache(deviceIds, times, typesList, 
valuesList);
+    } else {
+      TSFastInsertRecordsReq request;
+      try {
+        request = genTSFastInsertRecordsReq(deviceIds, times, typesList, 
valuesList);
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceIds are 
[{}],times are [{}]",
+            deviceIds.toString(),
+            times.toString());
+        return;
+      }
+      try {
+        defaultSessionConnection.fastInsertRecords(request);
+      } catch (RedirectException ignored) {
+      }
+    }
+  }
 
   /**
    * Insert multiple rows, which can reduce the overhead of network. This 
method is just like jdbc
@@ -2335,6 +2358,31 @@ public class Session implements ISession {
     insertByGroup(recordsGroup, SessionConnection::insertRecords);
   }
 
+  private void fastInsertRecordsWithLeaderCache(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Map<SessionConnection, TSFastInsertRecordsReq> recordsGroup = new 
HashMap<>();
+    for (int i = 0; i < deviceIds.size(); i++) {
+      final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
+      TSFastInsertRecordsReq request =
+          recordsGroup.getOrDefault(connection, new TSFastInsertRecordsReq());
+      try {
+        updateTSFastInsertRecordsReq(
+            request, deviceIds.get(i), times.get(i), typesList.get(i), 
valuesList.get(i));
+        recordsGroup.putIfAbsent(connection, request);
+      } catch (NoValidValueException e) {
+        logger.warn(
+            "All values are null and this submission is ignored,deviceId is 
[{}],time is [{}]",
+            deviceIds.get(i),
+            times.get(i));
+      }
+    }
+    insertByGroup(recordsGroup, SessionConnection::fastInsertRecords);
+  }
+
   private TSInsertRecordsReq filterAndGenTSInsertRecordsReq(
       List<String> deviceIds,
       List<Long> times,
@@ -2373,6 +2421,20 @@ public class Session implements ISession {
     return request;
   }
 
+  private TSFastInsertRecordsReq genTSFastInsertRecordsReq(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException {
+    TSFastInsertRecordsReq request = new TSFastInsertRecordsReq();
+    request.setPrefixPaths(deviceIds);
+    request.setTimestamps(times);
+    List<ByteBuffer> buffersList = 
objectValuesListToByteBufferList(valuesList, typesList);
+    request.setValuesList(buffersList);
+    return request;
+  }
+
   private void filterAndUpdateTSInsertRecordsReq(
       TSInsertRecordsReq request,
       String deviceId,
@@ -2404,12 +2466,25 @@ public class Session implements ISession {
       throws IoTDBConnectionException {
     request.addToPrefixPaths(deviceId);
     request.addToTimestamps(time);
-    // TODO: (FASTWRITE) 不需要再添加 measurement 的信息
     request.addToMeasurementsList(measurements);
     ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
     request.addToValuesList(buffer);
   }
 
+  private void updateTSFastInsertRecordsReq(
+      TSFastInsertRecordsReq request,
+      String deviceId,
+      Long time,
+      List<TSDataType> types,
+      List<Object> values)
+      throws IoTDBConnectionException {
+    request.addToPrefixPaths(deviceId);
+    request.addToTimestamps(time);
+    ByteBuffer buffer =
+        ByteBuffer.allocate(SessionUtils.calculateLengthForFastInsert(types, 
values));
+    request.addToValuesList(buffer);
+  }
+
   /**
    * insert the data of a device. For each timestamp, the number of 
measurements is the same.
    *
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index d5d90c4e23..58f2c40213 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -639,6 +640,26 @@ public class SessionConnection {
     }
   }
 
+  protected void fastInsertRecords(TSFastInsertRecordsReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+          client.fastInsertRecords(request), request.getPrefixPaths());
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.fastInsertRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(logForReconnectionFailure());
+      }
+    }
+  }
+
   protected void insertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
     request.setSessionId(sessionId);
diff --git 
a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java 
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 57ea8e8f0b..5f55031b4f 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -79,8 +79,6 @@ public class SessionUtils {
   public static ByteBuffer getValueBuffer(List<TSDataType> types, List<Object> 
values)
       throws IoTDBConnectionException {
     ByteBuffer buffer = 
ByteBuffer.allocate(SessionUtils.calculateLength(types, values));
-    // TODO: (FASTWRITE) putValues 时可以少写一个字节的 Type
-    SessionUtils.putValues(types, values, buffer);
     return buffer;
   }
 
@@ -121,6 +119,41 @@ public class SessionUtils {
     return res;
   }
 
+  public static int calculateLengthForFastInsert(List<TSDataType> types, 
List<Object> values)
+      throws IoTDBConnectionException {
+    int res = 0;
+    for (int i = 0; i < types.size(); i++) {
+      // types
+      switch (types.get(i)) {
+        case BOOLEAN:
+          res += 1;
+          break;
+        case INT32:
+          res += Integer.BYTES;
+          break;
+        case INT64:
+          res += Long.BYTES;
+          break;
+        case FLOAT:
+          res += Float.BYTES;
+          break;
+        case DOUBLE:
+          res += Double.BYTES;
+          break;
+        case TEXT:
+          res += Integer.BYTES;
+          if (values.get(i) instanceof Binary) {
+            res += ((Binary) values.get(i)).getValues().length;
+          } else {
+            res += ((String) 
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length;
+          }
+          break;
+        default:
+          throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + 
types.get(i));
+      }
+    }
+    return res;
+  }
   /**
    * put value in buffer
    *
@@ -136,7 +169,6 @@ public class SessionUtils {
         ReadWriteIOUtils.write(TYPE_NULL, buffer);
         continue;
       }
-      ReadWriteIOUtils.write(types.get(i), buffer);
       switch (types.get(i)) {
         case BOOLEAN:
           ReadWriteIOUtils.write((Boolean) values.get(i), buffer);

Reply via email to