This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch new-rpc-format-impl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d3713d9255002902fe8a2bea7d67be69b65d0531
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Mar 14 11:27:06 2024 +0800

    temp
---
 .../rpc/TCompressedElasticFramedTransport.java     |  10 ++
 .../apache/iotdb/session/SessionConnection.java    |   1 +
 .../apache/iotdb/session/util/SessionRPCUtils.java | 131 +++++++++++++++------
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  12 +-
 .../thrift-datanode/src/main/thrift/client.thrift  |   1 +
 5 files changed, 118 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index a3b4f38064a..8e6d8eb1c38 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -99,4 +99,14 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
 
   protected abstract void uncompress(byte[] input, int inOff, int size, byte[] 
output, int outOff)
       throws IOException;
+
+  public int getCompressedSize() throws IOException {
+    int length = writeBuffer.getPos();
+    RpcStat.writeBytes.addAndGet(length);
+    int maxCompressedLength = maxCompressedLength(length);
+    writeCompressBuffer.resizeIfNecessary(maxCompressedLength);
+    int compressedLength =
+        compress(writeBuffer.getBuffer(), 0, length, 
writeCompressBuffer.getBuffer(), 0);
+    return compressedLength;
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index f9fa976d1e7..11aeb5db0c3 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -914,6 +914,7 @@ public class SessionConnection {
   }
 
   private TSStatus insertRecordsV2Internal(TSInsertRecordsReqV2 request) 
throws TException {
+    request.setSessionId(sessionId);
     return client.insertRecordsV2(request);
   }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
index c3eed8befa9..9a45e946b94 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
@@ -19,6 +19,9 @@
 package org.apache.iotdb.session.util;
 
 import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.TSnappyElasticFramedTransport;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.session.req.InsertRecordsRequest;
 import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -33,7 +36,13 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -553,23 +562,79 @@ public class SessionRPCUtils {
   }
 
   public static InsertRecordsRequest deserializeInsertRecordsReq(
-      ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer 
auxiliaryBuffer) {
-    return null;
+      ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer 
auxiliaryBuffer)
+      throws IOException {
+    List<String> deviceIds = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<String>> measurementIds = new ArrayList<>();
+    List<List<TSDataType>> dataTypes = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+    deserializeSchema(schemaBuffer, deviceIds, measurementIds);
+    deserializeValueBuffer(valueBuffer, timestamps, dataTypes, values);
+    return new InsertRecordsRequest(deviceIds, measurementIds, timestamps, 
dataTypes, values);
   }
 
   public static void main(String[] args) throws Exception {
     testValue();
   }
 
-  private static void testValue() throws IOException, ClassNotFoundException {
+  private static void testValue()
+      throws IOException, ClassNotFoundException, IoTDBConnectionException, 
TException {
     List<Long> timestamps = new ArrayList<>();
     List<List<TSDataType>> measurements = new ArrayList<>();
     List<List<Object>> values = new ArrayList<>();
     List<TSDataType> types = new ArrayList<>();
     List<Object> value = new ArrayList<>();
+    int totalSize = 0;
+    for (int i = 0; i < 10000; ++i) {
+      timestamps.add(System.currentTimeMillis());
+      totalSize += genOneRow(value, types);
+      totalSize += 8;
+      measurements.add(types);
+      values.add(value);
+      value = new ArrayList<>();
+      types = new ArrayList<>();
+    }
+    long startTime = System.currentTimeMillis();
+    ByteBuffer buffer = serializeValue(timestamps, measurements, values);
+    PublicBAOS baos = new PublicBAOS();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(timestamps);
+    oos.writeObject(measurements);
+    oos.writeObject(values);
+    oos.flush();
+    System.out.println("Serialize time cost is " + (System.currentTimeMillis() 
- startTime));
+    System.out.println("Original Size is " + totalSize);
+    System.out.println("Serialized Size is " + buffer.limit());
+    System.out.println("Compression rate is " + (double) totalSize / 
buffer.limit());
+    System.out.println("Java serialize size is " + baos.size());
+    TSInsertRecordsReq req = new TSInsertRecordsReq();
+    req.setPrefixPaths(new ArrayList<>());
+    req.setMeasurementsList(new ArrayList<>());
+    req.setTimestamps(timestamps);
+    List<ByteBuffer> buffers = new ArrayList<>();
+    for (int i = 0; i < values.size(); ++i) {
+      buffers.add(SessionUtils.getValueBuffer(measurements.get(i), 
values.get(i)));
+    }
+    req.setValuesList(buffers);
+    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
+    TSnappyElasticFramedTransport trans = new 
TSnappyElasticFramedTransport(null);
+    TProtocol protocol = protocolFactory.getProtocol(trans);
+    startTime = System.currentTimeMillis();
+    req.write(protocol);
+    int snappyCompressedSize = trans.getCompressedSize();
+
+    System.out.println(
+        "Thrift serialize time and compression time is "
+            + (System.currentTimeMillis() - startTime));
+    System.out.println("Thrift trans compressed size is " + 
snappyCompressedSize);
+    System.out.println("Thrift compression rate is " + ((double) totalSize / 
snappyCompressedSize));
+  }
+
+  private static int genOneRow(List<Object> value, List<TSDataType> types) {
     int singleRowSize = 0;
     for (int j = 0; j < 100; ++j) {
-      switch (((byte) j % 6)) {
+      switch (new Random().nextInt(6)) {
         case 0:
           types.add(TSDataType.BOOLEAN);
           value.add(new Random().nextBoolean());
@@ -577,55 +642,31 @@ public class SessionRPCUtils {
           break;
         case 1:
           types.add(TSDataType.INT32);
-          value.add(new Random().nextInt());
+          value.add(100 + new Random().nextInt(10));
           singleRowSize += 5;
           break;
         case 2:
           types.add(TSDataType.INT64);
-          value.add(new Random().nextLong());
+          value.add(599L + new Random().nextInt(10));
           singleRowSize += 9;
           break;
         case 3:
           types.add(TSDataType.FLOAT);
-          value.add(new Random().nextFloat());
-          singleRowSize += 5;
+          value.add(10.0f + (new Random().nextInt(100) * 0.01f));
           break;
         case 4:
           types.add(TSDataType.DOUBLE);
-          value.add(new Random().nextDouble());
-          singleRowSize += 9;
+          value.add(50.0 + new Random().nextInt(200) * 0.01);
           break;
         case 5:
           types.add(TSDataType.TEXT);
-          value.add(String.valueOf(new Random().nextInt(100)));
-          singleRowSize += 7;
+          String s = String.valueOf(new Random().nextInt(10));
+          value.add(s);
+          singleRowSize += (s.getBytes().length + 1);
           break;
       }
     }
-    singleRowSize += 8;
-    int totalSize = singleRowSize * 10000;
-    for (int i = 0; i < 10000; ++i) {
-      timestamps.add(System.currentTimeMillis());
-      measurements.add(types);
-      values.add(value);
-    }
-    long startTime = System.currentTimeMillis();
-    ByteBuffer buffer = serializeValue(timestamps, measurements, values);
-    System.out.println("Serialize time cost is " + (System.currentTimeMillis() 
- startTime));
-    System.out.println("Original Size is " + totalSize);
-    System.out.println("Serialized Size is " + buffer.limit());
-    System.out.println("Compression rate is " + (double) totalSize / 
buffer.limit());
-    List<Long> decodedTime = new ArrayList<>();
-    List<List<TSDataType>> decodedTypes = new ArrayList<>();
-    List<List<Object>> decodedValues = new ArrayList<>();
-    startTime = System.currentTimeMillis();
-    deserializeValueBuffer(buffer, decodedTime, decodedTypes, decodedValues);
-    System.out.println("Time cost is " + (System.currentTimeMillis() - 
startTime));
-    if (!decodedTime.equals(timestamps)
-        || !decodedTypes.equals(measurements)
-        || !decodedValues.equals(values)) {
-      System.out.println("ERROR");
-    }
+    return singleRowSize;
   }
 
   private static void testSchema() throws IOException {
@@ -661,4 +702,22 @@ public class SessionRPCUtils {
       System.out.println("Test failed");
     }
   }
+
+  private static void testThrift() throws Exception {
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> measurements = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    List<Object> value = new ArrayList<>();
+    int totalSize = 0;
+    for (int i = 0; i < 10000; ++i) {
+      timestamps.add(System.currentTimeMillis());
+      totalSize += genOneRow(value, types);
+      totalSize += 8;
+      measurements.add(types);
+      values.add(value);
+      value = new ArrayList<>();
+      types = new ArrayList<>();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 0983c700fec..ae86f7aed87 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -164,6 +164,8 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.session.req.InsertRecordsRequest;
+import org.apache.iotdb.session.util.SessionRPCUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -1706,7 +1708,15 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus insertRecordsV2(TSInsertRecordsReqV2 req) {
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    try {
+      InsertRecordsRequest request =
+          SessionRPCUtils.deserializeInsertRecordsReq(req.schemaBuffer, 
req.valueBuffer, null);
+
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (IOException e) {
+      return onNpeOrUnexpectedException(
+          e, OperationType.INSERT_RECORDS, TSStatusCode.INTERNAL_SERVER_ERROR);
+    }
   }
 
   @Override
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 22ec15b5151..7ae4a811554 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -259,6 +259,7 @@ struct TSInsertRecordsReqV2 {
   1: required binary schemaBuffer
   2: required binary valueBuffer
   3: required binary auxiliaryBuffer
+  4: optional i64 sessionId
 }
 
 struct TSInsertRecordsOfOneDeviceReq {

Reply via email to