This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch new-rpc-format-impl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 689ab4408a203f943f296c0db0def405c3437cb3
Author: Liu Xuxin <[email protected]>
AuthorDate: Mon Mar 11 15:29:10 2024 +0800

    pass schema serialize and deserialize test
---
 .../iotdb/session/req/InsertRecordsRequest.java    |  86 +++++++++
 .../apache/iotdb/session/util/SessionRPCUtils.java | 210 +++++++++++++++++++++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |   6 +
 .../thrift-datanode/src/main/thrift/client.thrift  |   8 +
 4 files changed, 310 insertions(+)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java
new file mode 100644
index 00000000000..407859efa08
--- /dev/null
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.req;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+
+public class InsertRecordsRequest {
+  private List<String> deviceIds;
+  private List<List<String>> measurementsIdsList;
+  private List<Long> timestamps;
+  private List<List<TSDataType>> dataTypesList;
+  private List<List<Object>> valuesList;
+
+  public InsertRecordsRequest(
+      List<String> deviceIds,
+      List<List<String>> measurementsIdsList,
+      List<Long> timestamps,
+      List<List<TSDataType>> dataTypesList,
+      List<List<Object>> valuesList) {
+    this.deviceIds = deviceIds;
+    this.measurementsIdsList = measurementsIdsList;
+    this.timestamps = timestamps;
+    this.dataTypesList = dataTypesList;
+    this.valuesList = valuesList;
+  }
+
+  public InsertRecordsRequest() {}
+
+  public List<String> getDeviceIds() {
+    return deviceIds;
+  }
+
+  public List<List<String>> getMeasurementsIdsList() {
+    return measurementsIdsList;
+  }
+
+  public List<Long> getTimestamps() {
+    return timestamps;
+  }
+
+  public List<List<TSDataType>> getDataTypesList() {
+    return dataTypesList;
+  }
+
+  public List<List<Object>> getValuesList() {
+    return valuesList;
+  }
+
+  public void setDeviceIds(List<String> deviceIds) {
+    this.deviceIds = deviceIds;
+  }
+
+  public void setMeasurementsIdsList(List<List<String>> measurementsIdsList) {
+    this.measurementsIdsList = measurementsIdsList;
+  }
+
+  public void setTimestamps(List<Long> timestamps) {
+    this.timestamps = timestamps;
+  }
+
+  public void setDataTypesList(List<List<TSDataType>> dataTypesList) {
+    this.dataTypesList = dataTypesList;
+  }
+
+  public void setValuesList(List<List<Object>> valuesList) {
+    this.valuesList = valuesList;
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
new file mode 100644
index 00000000000..5431b7dbb95
--- /dev/null
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.util;
+
+import org.apache.iotdb.session.req.InsertRecordsRequest;
+import org.apache.iotdb.tsfile.encoding.decoder.IntRleDecoder;
+import org.apache.iotdb.tsfile.encoding.encoder.IntRleEncoder;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SessionRPCUtils {
+  public static ByteBuffer[] serializeInsertRecordsReq(
+      List<String> deviceIds,
+      List<List<String>> measurementIdsList,
+      List<Long> timestamp,
+      List<List<TSDataType>> tpyesList,
+      List<List<Object>> valuesList)
+      throws IOException {
+
+    return null;
+  }
+
+  private static ByteBuffer serializeSchema(
+      List<String> deviceIds, List<List<String>> measurementIdsList) throws 
IOException {
+    // creating dictionary
+    Map<String, Integer> dictionary = getDictionary(deviceIds, 
measurementIdsList);
+    // serializing dictionary
+    PublicBAOS schemaBufferOS = new PublicBAOS();
+    serializeDictionary(dictionary, schemaBufferOS);
+    dictionaryEncoding(dictionary, deviceIds, measurementIdsList, 
schemaBufferOS);
+    ByteBuffer schemaBuffer = ByteBuffer.allocate(schemaBufferOS.size());
+    schemaBuffer.put(schemaBufferOS.getBuf(), 0, schemaBufferOS.size());
+    schemaBuffer.flip();
+    return schemaBuffer;
+  }
+
+  private static void deserializeSchema(
+      ByteBuffer schemaBuffer,
+      List<String> decodedDeviceIds,
+      List<List<String>> decodedMeasurementIdsList) {
+    String[] dictionary = getDictionary(schemaBuffer);
+    IntRleDecoder decoder = new IntRleDecoder();
+    deserializeDevices(schemaBuffer, decoder, dictionary, decodedDeviceIds);
+    deserializeMeasurementIds(schemaBuffer, decoder, dictionary, 
decodedMeasurementIdsList);
+  }
+
+  private static String[] getDictionary(ByteBuffer schemaBuffer) {
+    int dictionarySize = ReadWriteIOUtils.readInt(schemaBuffer);
+    String[] dictionary = new String[dictionarySize];
+    for (int i = 0; i < dictionarySize; ++i) {
+      dictionary[i] = ReadWriteIOUtils.readString(schemaBuffer);
+    }
+    return dictionary;
+  }
+
+  private static void deserializeDevices(
+      ByteBuffer buffer,
+      IntRleDecoder decoder,
+      String[] dictionary,
+      List<String> decodedDeviceIds) {
+    int deviceSize = decoder.readInt(buffer);
+    for (int i = 0; i < deviceSize; ++i) {
+      StringBuilder builder = new StringBuilder();
+      int wordSize = decoder.readInt(buffer);
+      for (int j = 0; j < wordSize; ++j) {
+        builder.append(dictionary[decoder.readInt(buffer)]);
+        if (j != wordSize - 1) {
+          builder.append(".");
+        }
+      }
+      decodedDeviceIds.add(builder.toString());
+    }
+  }
+
+  private static void deserializeMeasurementIds(
+      ByteBuffer buffer,
+      IntRleDecoder decoder,
+      String[] dictionary,
+      List<List<String>> decodedMeasurementIdsList) {
+    int listNum = decoder.readInt(buffer);
+    for (int i = 0; i < listNum; ++i) {
+      int currListSize = decoder.readInt(buffer);
+      List<String> currList = new ArrayList<>(currListSize);
+      for (int j = 0; j < currListSize; ++j) {
+        currList.add(dictionary[decoder.readInt(buffer)]);
+      }
+      decodedMeasurementIdsList.add(currList);
+    }
+  }
+
+  private static Map<String, Integer> getDictionary(
+      List<String> deviceId, List<List<String>> measurementIdsList) {
+    Map<String, Integer> dictionary = new LinkedHashMap<>();
+    AtomicInteger count = new AtomicInteger(0);
+    deviceId.forEach(
+        s -> {
+          Arrays.stream(s.split("\\."))
+              .forEach(
+                  word -> {
+                    dictionary.computeIfAbsent(word, k -> 
count.getAndIncrement());
+                  });
+        });
+    measurementIdsList.forEach(
+        measurementIds -> {
+          measurementIds.forEach(
+              measurementId -> {
+                dictionary.computeIfAbsent(measurementId, k -> 
count.getAndIncrement());
+              });
+        });
+    return dictionary;
+  }
+
+  private static void serializeDictionary(Map<String, Integer> dictionary, 
PublicBAOS buffer)
+      throws IOException {
+    ReadWriteIOUtils.write(dictionary.size(), buffer);
+    for (Map.Entry<String, Integer> entry : dictionary.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), buffer);
+    }
+  }
+
+  private static void dictionaryEncoding(
+      Map<String, Integer> dictionary,
+      List<String> deviceIds,
+      List<List<String>> measurementIdsList,
+      PublicBAOS buffer)
+      throws IOException {
+    IntRleEncoder rleEncoder = new IntRleEncoder();
+    rleEncoder.encode(deviceIds.size(), buffer);
+    for (String deviceId : deviceIds) {
+      String[] words = deviceId.split("\\.");
+      rleEncoder.encode(words.length, buffer);
+      Arrays.stream(words)
+          .forEach(
+              word -> {
+                rleEncoder.encode(dictionary.get(word), buffer);
+              });
+    }
+    rleEncoder.encode(measurementIdsList.size(), buffer);
+    for (List<String> measurements : measurementIdsList) {
+      rleEncoder.encode(measurements.size(), buffer);
+      for (String measurement : measurements) {
+        rleEncoder.encode(dictionary.get(measurement), buffer);
+      }
+    }
+    rleEncoder.flush(buffer);
+  }
+
+  public static InsertRecordsRequest deserializeInsertRecordsReq(
+      ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer 
auxiliaryBuffer) {
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception {
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementIdsList = new ArrayList<>();
+    List<String> measurementIds = new ArrayList<>();
+    int size = 0;
+    int measurementSize = 0;
+    for (int i = 0; i < 100; ++i) {
+      String deviceId = "root.sg1.s10.d" + i;
+      size += deviceId.getBytes().length;
+      deviceIds.add("root.sg1.s10.d" + i);
+      measurementSize += ("s" + i).getBytes().length;
+      measurementIds.add("s" + i);
+    }
+    for (int i = 0; i < 100; ++i) {
+      measurementIdsList.add(measurementIds);
+    }
+    size += measurementSize * 100;
+
+    ByteBuffer buffer = serializeSchema(deviceIds, measurementIdsList);
+    System.out.println("Original Size is " + size);
+    System.out.println("Serialized Size is " + buffer.remaining());
+    List<String> decodedDeviceIds = new ArrayList<>();
+    List<List<String>> decodedMeasurementIdsList = new ArrayList<>();
+    deserializeSchema(buffer, decodedDeviceIds, decodedMeasurementIdsList);
+    if (deviceIds.equals(decodedDeviceIds)
+        && measurementIdsList.equals(decodedMeasurementIdsList)) {
+      System.out.println("Test passed");
+    } else {
+      System.out.println("Test failed");
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 6d20d6f7d49..0983c700fec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -144,6 +144,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -1703,6 +1704,11 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
+  @Override
+  public TSStatus insertRecordsV2(TSInsertRecordsReqV2 req) {
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   @Override
   public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
     long t1 = System.nanoTime();
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index d96a04d7b21..22ec15b5151 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -255,6 +255,12 @@ struct TSInsertRecordsReq {
   6: optional bool isAligned
 }
 
+struct TSInsertRecordsReqV2 {
+  1: required binary schemaBuffer
+  2: required binary valueBuffer
+  3: required binary auxiliaryBuffer
+}
+
 struct TSInsertRecordsOfOneDeviceReq {
     1: required i64 sessionId
     2: required string prefixPath
@@ -596,6 +602,8 @@ service IClientRPCService {
 
   common.TSStatus insertRecords(1:TSInsertRecordsReq req);
 
+  common.TSStatus insertRecordsV2(2:TSInsertRecordsReqV2 req);
+
   common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq 
req);
 
   common.TSStatus 
insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);

Reply via email to