This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch test-new-record-rpc-format
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e10c8554aecc3554d87d657557bac851da1cb4b9
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jan 23 10:26:19 2024 +0800

    support new rpc format insertion
---
 iotdb-client/session/pom.xml                       |  5 ++++
 .../java/org/apache/iotdb/session/Session.java     | 25 ++++++++++++++++-
 .../apache/iotdb/session/SessionConnection.java    | 10 +++++++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 31 ++++++++++++++++++++++
 .../plan/parser/StatementGenerator.java            | 23 ++++++++++++++++
 .../thrift-datanode/src/main/thrift/client.thrift  |  7 +++++
 6 files changed, 100 insertions(+), 1 deletion(-)

diff --git a/iotdb-client/session/pom.xml b/iotdb-client/session/pom.xml
index dcaa3515914..1c77ca417a7 100644
--- a/iotdb-client/session/pom.xml
+++ b/iotdb-client/session/pom.xml
@@ -72,6 +72,11 @@
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mine.rpcutils</groupId>
+            <artifactId>RPC-Utils</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 4d0ad33efd7..66c137fc8f8 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -72,6 +73,8 @@ import 
org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.thrift.TException;
+import org.mine.rpc.InsertRecordsReq;
+import org.mine.rpc.InsertRecordsSerializeInColumnUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,6 +126,8 @@ public class Session implements ISession {
   protected boolean useSSL;
   protected String trustStore;
   protected String trustStorePwd;
+  public static boolean useNewFormat = true;
+
   /**
    * Timeout of query can be set by users. A negative number means using the 
default configuration
    * of server. And value 0 will disable the function of query timeout.
@@ -1904,7 +1909,7 @@ public class Session implements ISession {
     if (enableRedirection) {
       insertRecordsWithLeaderCache(
           deviceIds, times, measurementsList, typesList, valuesList, false);
-    } else {
+    } else if (!useNewFormat) {
       TSInsertRecordsReq request;
       try {
         request =
@@ -1918,6 +1923,24 @@ public class Session implements ISession {
         defaultSessionConnection.insertRecords(request);
       } catch (RedirectException ignored) {
       }
+    } else {
+      // insert using new column rpc format
+      InsertRecordsReq req =
+          new InsertRecordsReq(deviceIds, measurementsList, typesList, 
valuesList, times);
+      ByteBuffer buffer = null;
+      try {
+        buffer = InsertRecordsSerializeInColumnUtils.encode(req);
+      } catch (IOException e) {
+        logger.error("Meets Exception when serializing buffer", e);
+        return;
+      }
+      TSInsertRecordsReqV2ColumnFormat request = new 
TSInsertRecordsReqV2ColumnFormat();
+      request.setBuffer(buffer);
+      try {
+        defaultSessionConnection.insertRecords(request);
+      } catch (Exception e) {
+        logger.error("Meets exception when insert new records", e);
+      }
     }
   }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 68e221807a6..57286acd01c 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -877,6 +878,15 @@ public class SessionConnection {
     return client.insertRecords(request);
   }
 
+  protected void insertRecords(TSInsertRecordsReqV2ColumnFormat request) 
throws TException {
+    request.setSessionId(sessionId);
+    try {
+      client.insertRecordsV2ColumnFormat(request);
+    } catch (TException e) {
+      throw e;
+    }
+  }
+
   protected void insertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 27bda2168f8..1129bc436fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -142,6 +142,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -181,6 +182,8 @@ import io.airlift.units.Duration;
 import io.jsonwebtoken.lang.Strings;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.thrift.TException;
+import org.mine.rpc.InsertRecordsReq;
+import org.mine.rpc.InsertRecordsSerializeInColumnUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1643,6 +1646,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // Step 1:  transfer from TSInsertRecordsReq to Statement
       InsertRowsStatement statement = StatementGenerator.createStatement(req);
+      req = null;
       // return success when this statement is empty because server doesn't 
need to execute it
       if (statement.isEmpty()) {
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1694,6 +1698,33 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
+  @Override
+  public TSStatus insertRecordsV2ColumnFormat(TSInsertRecordsReqV2ColumnFormat 
req) {
+    byte[] buffer = req.getBuffer();
+    if (buffer == null) {
+      return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, "Buffer is 
null");
+    }
+    try {
+      InsertRecordsReq originalReq =
+          InsertRecordsSerializeInColumnUtils.decode(ByteBuffer.wrap(buffer));
+      InsertRowsStatement statement = 
StatementGenerator.createStatement(originalReq);
+      //      originalReq = null;
+      long queryId = SESSION_MANAGER.requestQueryId();
+      ExecutionResult result =
+          COORDINATOR.execute(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+              "",
+              partitionFetcher,
+              schemaFetcher);
+      return result.status;
+    } catch (Exception e) {
+      LOGGER.error("Meet error while inserting will new request", e);
+      return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, 
e.getMessage());
+    }
+  }
+
   @Override
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
     long t1 = System.nanoTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index 3a7ac2c0cbe..830228246d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -100,6 +100,7 @@ import org.antlr.v4.runtime.CharStreams;
 import org.antlr.v4.runtime.CommonTokenStream;
 import org.antlr.v4.runtime.atn.PredictionMode;
 import org.antlr.v4.runtime.tree.ParseTree;
+import org.mine.rpc.InsertRecordsReq;
 
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
@@ -408,6 +409,28 @@ public class StatementGenerator {
     return insertStatement;
   }
 
+  public static InsertRowsStatement createStatement(InsertRecordsReq req)
+      throws IllegalPathException {
+    final long startTime = System.nanoTime();
+    // construct insert statement
+    InsertRowsStatement insertStatement = new InsertRowsStatement();
+    List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+    List<String> devicePath = req.getPrefixPath();
+    for (int i = 0, size = devicePath.size(); i < size; ++i) {
+      InsertRowStatement statement = new InsertRowStatement();
+      
statement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(devicePath.get(i)));
+      statement.setMeasurements(req.getMeasurements().get(i).toArray(new 
String[0]));
+      
TimestampPrecisionUtils.checkTimestampPrecision(req.getTimestamp().get(i));
+      statement.setTime(req.getTimestamp().get(i));
+      statement.setValues(req.getValues().get(i).toArray(new Object[0]));
+      statement.setDataTypes(req.getTypes().get(i).toArray(new TSDataType[0]));
+      insertRowStatementList.add(statement);
+    }
+    insertStatement.setInsertRowStatementList(insertRowStatementList);
+    PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - 
startTime);
+    return insertStatement;
+  }
+
   public static InsertRowsStatement createStatement(TSInsertStringRecordsReq 
req)
       throws IllegalPathException {
     final long startTime = System.nanoTime();
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 2b2f5585726..6dbb05a5654 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -255,6 +255,11 @@ struct TSInsertRecordsReq {
   6: optional bool isAligned
 }
 
+struct TSInsertRecordsReqV2ColumnFormat {
+  1: required i64 sessionId
+  2: required binary buffer
+}
+
 struct TSInsertRecordsOfOneDeviceReq {
     1: required i64 sessionId
     2: required string prefixPath
@@ -583,6 +588,8 @@ service IClientRPCService {
 
   common.TSStatus insertRecords(1:TSInsertRecordsReq req);
 
+  common.TSStatus 
insertRecordsV2ColumnFormat(1:TSInsertRecordsReqV2ColumnFormat req);
+
   common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq 
req);
 
   common.TSStatus 
insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);

Reply via email to