This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch new-rpc-format-impl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 034a54fffa782337faefb95e4bd7cde83568cc72
Author: Liu Xuxin <[email protected]>
AuthorDate: Mon Mar 11 23:23:34 2024 +0800

    finish
---
 .../apache/iotdb/session/util/SessionRPCUtils.java | 227 +++++++++++++++++----
 .../iotdb/session/util/ValueBufferBuilder.java     |   4 +-
 2 files changed, 189 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
index ad207be780d..cf9b344dd8b 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.session.req.InsertRecordsRequest;
 import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.encoding.decoder.IntRleDecoder;
 import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
 import org.apache.iotdb.tsfile.encoding.encoder.IntRleEncoder;
@@ -30,19 +31,16 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class SessionRPCUtils {
@@ -253,54 +251,45 @@ public class SessionRPCUtils {
     List<Double> doubleList = new ArrayList<>();
     List<Boolean> booleanList = new ArrayList<>();
     List<String> stringList = new ArrayList<>();
-    Encoder encoder =
-        
TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN).getEncoder(TSDataType.INT32);
+    PublicBAOS typeBuffer = new PublicBAOS();
     for (int i = 0; i < recordCount; ++i) {
       List<TSDataType> types = typesList.get(i);
       List<Object> values = valuesList.get(i);
       int size = types.size();
-      encoder.encode(size, buffer);
+      ReadWriteIOUtils.write(size, buffer);
       for (int j = 0; j < size; ++j) {
-        int offset = -1;
         switch (types.get(j)) {
           case INT32:
-            offset = intList.size();
             intList.add((Integer) values.get(j));
             break;
           case INT64:
-            offset = longList.size();
             longList.add((Long) values.get(j));
             break;
           case FLOAT:
-            offset = floatList.size();
             floatList.add((Float) values.get(j));
             break;
           case DOUBLE:
-            offset = doubleList.size();
             doubleList.add((Double) values.get(j));
             break;
           case BOOLEAN:
-            offset = booleanList.size();
             booleanList.add((Boolean) values.get(j));
             break;
           case TEXT:
-            offset = stringList.size();
             stringList.add((String) values.get(j));
             break;
           default:
             throw new IOException("Unsupported data type " + types.get(j));
         }
-        encoder.encode((int) (types.get(j).getType()), buffer);
-        encoder.encode(offset, buffer);
+        ReadWriteIOUtils.write(types.get(j), typeBuffer);
       }
     }
-    encoder.flush(buffer);
     serializeIntList(intList, buffer);
     serializeLongList(longList, buffer);
     serializeFloatList(floatList, buffer);
     serializeDoubleList(doubleList, buffer);
     serializeBooleanList(booleanList, buffer);
     serializeStringList(stringList, buffer);
+    buffer.write(typeBuffer.getBuf(), 0, typeBuffer.size());
   }
 
   private static void serializeIntList(List<Integer> values, PublicBAOS 
buffer) throws IOException {
@@ -315,7 +304,6 @@ public class SessionRPCUtils {
       encoder.encode(value, intBuffer);
     }
     encoder.flush(intBuffer);
-    ReadWriteIOUtils.write(intBuffer.getBuf().length, buffer);
     buffer.write(intBuffer.getBuf(), 0, intBuffer.size());
   }
 
@@ -331,7 +319,6 @@ public class SessionRPCUtils {
       encoder.encode(value, longBuffer);
     }
     encoder.flush(longBuffer);
-    ReadWriteIOUtils.write(longBuffer.getBuf().length, buffer);
     buffer.write(longBuffer.getBuf(), 0, longBuffer.size());
   }
 
@@ -347,7 +334,6 @@ public class SessionRPCUtils {
       encoder.encode(value, floatBuffer);
     }
     encoder.flush(floatBuffer);
-    ReadWriteIOUtils.write(floatBuffer.getBuf().length, buffer);
     buffer.write(floatBuffer.getBuf(), 0, floatBuffer.size());
   }
 
@@ -364,7 +350,6 @@ public class SessionRPCUtils {
       encoder.encode(value, doubleBuffer);
     }
     encoder.flush(doubleBuffer);
-    ReadWriteIOUtils.write(doubleBuffer.getBuf().length, buffer);
     buffer.write(doubleBuffer.getBuf(), 0, doubleBuffer.size());
   }
 
@@ -404,10 +389,169 @@ public class SessionRPCUtils {
       encoder.encode(new Binary(value.getBytes()), stringBuffer);
     }
     encoder.flush(stringBuffer);
-    ReadWriteIOUtils.write(stringBuffer.getBuf().length, buffer);
     buffer.write(stringBuffer.getBuf(), 0, stringBuffer.size());
   }
 
+  private static void deserializeValueBuffer(
+      ByteBuffer buffer,
+      List<Long> outputTimestamps,
+      List<List<TSDataType>> outputTypesList,
+      List<List<Object>> outputValuesList)
+      throws IOException {
+    boolean compressed = ReadWriteIOUtils.readBool(buffer);
+    ByteBuffer dataBuffer = buffer;
+    if (compressed) {
+      int uncompressedSize = ReadWriteIOUtils.readInt(buffer);
+      byte[] uncompressed = new byte[uncompressedSize];
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType);
+      unCompressor.uncompress(buffer.array(), 5, buffer.limit() - 5, 
uncompressed, 0);
+      dataBuffer = ByteBuffer.wrap(uncompressed);
+    }
+    deserializeTime(dataBuffer, outputTimestamps);
+    deserializeTypesAndValues(dataBuffer, outputTypesList, outputValuesList);
+  }
+
+  private static void deserializeTime(ByteBuffer buffer, List<Long> 
timestamps) throws IOException {
+    Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, 
TSDataType.INT64);
+    while (decoder.hasNext(buffer)) {
+      timestamps.add(decoder.readLong(buffer));
+    }
+  }
+
+  private static void deserializeTypesAndValues(
+      ByteBuffer buffer, List<List<TSDataType>> typesList, List<List<Object>> 
valuesList)
+      throws IOException {
+    int recordCount = ReadWriteIOUtils.readInt(buffer);
+    int[] recordSizeArray = new int[recordCount];
+    for (int i = 0; i < recordCount; ++i) {
+      recordSizeArray[i] = ReadWriteIOUtils.readInt(buffer);
+    }
+    int[] intArray = deserializeIntList(buffer);
+    long[] longArray = deserializeLongList(buffer);
+    float[] floatArray = deserializeFloatList(buffer);
+    double[] doubleArray = deserializeDoubleList(buffer);
+    boolean[] booleanArray = deserializeBooleanList(buffer);
+    String[] stringArray = deserializeStringList(buffer);
+    int[] indexes = new int[6];
+    for (int i = 0; i < recordCount; ++i) {
+      List<TSDataType> types = new ArrayList<>(recordSizeArray[i]);
+      List<Object> values = new ArrayList<>(recordSizeArray[i]);
+      for (int j = 0; j < recordSizeArray[i]; ++j) {
+        byte b = buffer.get();
+        types.add(TSDataType.deserialize(b));
+        switch (types.get(j)) {
+          case INT32:
+            values.add(intArray[indexes[0]++]);
+            break;
+          case INT64:
+            values.add(longArray[indexes[1]++]);
+            break;
+          case FLOAT:
+            values.add(floatArray[indexes[2]++]);
+            break;
+          case DOUBLE:
+            values.add(doubleArray[indexes[3]++]);
+            break;
+          case BOOLEAN:
+            values.add(booleanArray[indexes[4]++]);
+            break;
+          case TEXT:
+            values.add(stringArray[indexes[5]++]);
+            break;
+          default:
+            throw new IOException("Unsupported data type " + types.get(j));
+        }
+      }
+      typesList.add(types);
+      valuesList.add(values);
+    }
+  }
+
+  private static int[] deserializeIntList(ByteBuffer buffer) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(buffer);
+    if (size == 0) {
+      return new int[0];
+    }
+    int[] intArray = new int[size];
+    Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, 
TSDataType.INT32);
+    for (int i = 0; i < size; ++i) {
+      intArray[i] = decoder.readInt(buffer);
+    }
+    if (decoder.hasNext(buffer)) {
+      System.out.println("ERROR");
+    }
+    return intArray;
+  }
+
+  private static long[] deserializeLongList(ByteBuffer buffer) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(buffer);
+    if (size == 0) {
+      return new long[0];
+    }
+    long[] longArray = new long[size];
+    Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, 
TSDataType.INT64);
+    for (int i = 0; i < size; ++i) {
+      longArray[i] = decoder.readLong(buffer);
+    }
+    return longArray;
+  }
+
+  private static float[] deserializeFloatList(ByteBuffer buffer) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(buffer);
+    if (size == 0) {
+      return new float[0];
+    }
+    float[] floatArray = new float[size];
+    Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, 
TSDataType.FLOAT);
+    for (int i = 0; i < size; ++i) {
+      floatArray[i] = decoder.readFloat(buffer);
+    }
+    return floatArray;
+  }
+
+  private static double[] deserializeDoubleList(ByteBuffer buffer) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(buffer);
+    if (size == 0) {
+      return new double[0];
+    }
+    double[] doubleArray = new double[size];
+    Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, 
TSDataType.DOUBLE);
+    for (int i = 0; i < size; ++i) {
+      doubleArray[i] = decoder.readDouble(buffer);
+    }
+    return doubleArray;
+  }
+
+  private static boolean[] deserializeBooleanList(ByteBuffer buffer) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(buffer);
+    if (size == 0) {
+      return new boolean[0];
+    }
+    int byteSize = ReadWriteIOUtils.readInt(buffer);
+    byte[] byteArray = new byte[byteSize];
+    buffer.get(byteArray);
+    boolean[] booleanArray = new boolean[size];
+    for (int i = 0; i < size; i++) {
+      int arrayIndex = i / 8;
+      int bitPosition = i % 8;
+      booleanArray[i] = (byteArray[arrayIndex] & (1 << bitPosition)) != 0;
+    }
+    return booleanArray;
+  }
+
+  private static String[] deserializeStringList(ByteBuffer buffer) throws 
IOException {
+    int size = ReadWriteIOUtils.readInt(buffer);
+    if (size == 0) {
+      return new String[0];
+    }
+    String[] stringArray = new String[size];
+    Decoder decoder = Decoder.getDecoderByType(TSEncoding.DICTIONARY, 
TSDataType.TEXT);
+    for (int i = 0; i < size; ++i) {
+      stringArray[i] = decoder.readBinary(buffer).toString();
+    }
+    return stringArray;
+  }
+
   public static InsertRecordsRequest deserializeInsertRecordsReq(
       ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer 
auxiliaryBuffer) {
     return null;
@@ -428,55 +572,60 @@ public class SessionRPCUtils {
       switch (((byte) j % 6)) {
         case 0:
           types.add(TSDataType.BOOLEAN);
-          value.add(true);
+          value.add(new Random().nextBoolean());
           singleRowSize += 2;
           break;
         case 1:
           types.add(TSDataType.INT32);
-          value.add(1);
+          value.add(new Random().nextInt());
           singleRowSize += 5;
           break;
         case 2:
           types.add(TSDataType.INT64);
-          value.add(1L);
+          value.add(new Random().nextLong());
           singleRowSize += 9;
           break;
         case 3:
           types.add(TSDataType.FLOAT);
-          value.add(1.0f);
+          value.add(new Random().nextFloat());
           singleRowSize += 5;
           break;
         case 4:
           types.add(TSDataType.DOUBLE);
-          value.add(1.0);
+          value.add(new Random().nextDouble());
           singleRowSize += 9;
           break;
         case 5:
           types.add(TSDataType.TEXT);
-          value.add("123");
+          value.add(String.valueOf(new Random().nextInt(100)));
           singleRowSize += 7;
           break;
       }
     }
     singleRowSize += 8;
-    int totalSize = singleRowSize * 1000;
-    for (int i = 0; i < 1000; ++i) {
+    int totalSize = singleRowSize * 10000;
+    for (int i = 0; i < 10000; ++i) {
       timestamps.add(System.currentTimeMillis());
       measurements.add(types);
       values.add(value);
     }
-    PublicBAOS baos = new PublicBAOS();
-    ObjectOutputStream oos = new ObjectOutputStream(baos);
-    oos.writeObject(values);
-    //    oos.writeObject(measurements);
-    //    oos.writeObject(timestamps);
-    oos.flush();
-    oos.close();
+    long startTime = System.currentTimeMillis();
     ByteBuffer buffer = serializeValue(timestamps, measurements, values);
+    System.out.println("Serialize time cost is " + (System.currentTimeMillis() 
- startTime));
     System.out.println("Original Size is " + totalSize);
     System.out.println("Serialized Size is " + buffer.limit());
-    System.out.println("Java serialize size is " + baos.size());
-    System.out.println();
+    System.out.println("Compression rate is " + (double) totalSize / 
buffer.limit());
+    List<Long> decodedTime = new ArrayList<>();
+    List<List<TSDataType>> decodedTypes = new ArrayList<>();
+    List<List<Object>> decodedValues = new ArrayList<>();
+    startTime = System.currentTimeMillis();
+    deserializeValueBuffer(buffer, decodedTime, decodedTypes, decodedValues);
+    System.out.println("Time cost is " + (System.currentTimeMillis() - 
startTime));
+    if (!decodedTime.equals(timestamps)
+        || !decodedTypes.equals(measurements)
+        || !decodedValues.equals(values)) {
+      System.out.println("ERROR");
+    }
   }
 
   private static void testSchema() throws IOException {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java
index fe02c4b529b..ada9c409112 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java
@@ -18,6 +18,4 @@
  */
 package org.apache.iotdb.session.util;
 
-public class ValueBufferBuilder {
-
-}
+public class ValueBufferBuilder {}

Reply via email to