This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch new-rpc-format-impl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6a6fb2c2282755ea28f261c457de28e26c189c56
Author: Liu Xuxin <[email protected]>
AuthorDate: Mon Mar 11 16:09:33 2024 +0800

    add compression support
---
 .../org/apache/iotdb/isession/SessionConfig.java   |  5 +++
 .../apache/iotdb/session/util/SessionRPCUtils.java | 40 ++++++++++++++++++----
 .../iotdb/tsfile/compress/IUnCompressor.java       |  2 +-
 3 files changed, 40 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
index ac14a99c80f..076120192fd 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.isession;
 
 import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 public class SessionConfig {
 
@@ -56,5 +57,9 @@ public class SessionConfig {
 
   public static final long RETRY_INTERVAL_IN_MS = 500;
 
+  public static boolean enableRPCCompression = true;
+
+  public static CompressionType rpcCompressionType = CompressionType.GZIP;
+
   private SessionConfig() {}
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
index 5431b7dbb95..bcba9566ab3 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
@@ -18,7 +18,10 @@
  */
 package org.apache.iotdb.session.util;
 
+import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.session.req.InsertRecordsRequest;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
 import org.apache.iotdb.tsfile.encoding.decoder.IntRleDecoder;
 import org.apache.iotdb.tsfile.encoding.encoder.IntRleEncoder;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -54,8 +57,20 @@ public class SessionRPCUtils {
     PublicBAOS schemaBufferOS = new PublicBAOS();
     serializeDictionary(dictionary, schemaBufferOS);
     dictionaryEncoding(dictionary, deviceIds, measurementIdsList, 
schemaBufferOS);
-    ByteBuffer schemaBuffer = ByteBuffer.allocate(schemaBufferOS.size());
-    schemaBuffer.put(schemaBufferOS.getBuf(), 0, schemaBufferOS.size());
+    ByteBuffer schemaBuffer = null;
+    if (SessionConfig.enableRPCCompression) {
+      ICompressor compressor = 
ICompressor.getCompressor(SessionConfig.rpcCompressionType);
+      byte[] compressedData =
+          compressor.compress(schemaBufferOS.getBuf(), 0, 
schemaBufferOS.size());
+      schemaBuffer = ByteBuffer.allocate(compressedData.length + 5);
+      schemaBuffer.put((byte) 1);
+      ReadWriteIOUtils.write(schemaBufferOS.size(), schemaBuffer);
+      schemaBuffer.put(compressedData);
+    } else {
+      schemaBuffer = ByteBuffer.allocate(schemaBufferOS.size() + 1);
+      schemaBuffer.put((byte) 0);
+      schemaBuffer.put(schemaBufferOS.getBuf(), 0, schemaBufferOS.size());
+    }
     schemaBuffer.flip();
     return schemaBuffer;
   }
@@ -63,11 +78,22 @@ public class SessionRPCUtils {
   private static void deserializeSchema(
       ByteBuffer schemaBuffer,
       List<String> decodedDeviceIds,
-      List<List<String>> decodedMeasurementIdsList) {
-    String[] dictionary = getDictionary(schemaBuffer);
+      List<List<String>> decodedMeasurementIdsList)
+      throws IOException {
+    boolean compressed = ReadWriteIOUtils.readBool(schemaBuffer);
+    ByteBuffer buffer = schemaBuffer;
+    if (compressed) {
+      int uncompressedLength = ReadWriteIOUtils.readInt(schemaBuffer);
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType);
+      byte[] uncompressed = new byte[uncompressedLength];
+      unCompressor.uncompress(schemaBuffer.array(), 5, schemaBuffer.limit() - 
5, uncompressed, 0);
+      buffer = ByteBuffer.wrap(uncompressed);
+    }
+
+    String[] dictionary = getDictionary(buffer);
     IntRleDecoder decoder = new IntRleDecoder();
-    deserializeDevices(schemaBuffer, decoder, dictionary, decodedDeviceIds);
-    deserializeMeasurementIds(schemaBuffer, decoder, dictionary, 
decodedMeasurementIdsList);
+    deserializeDevices(buffer, decoder, dictionary, decodedDeviceIds);
+    deserializeMeasurementIds(buffer, decoder, dictionary, 
decodedMeasurementIdsList);
   }
 
   private static String[] getDictionary(ByteBuffer schemaBuffer) {
@@ -194,7 +220,9 @@ public class SessionRPCUtils {
     }
     size += measurementSize * 100;
 
+    long startTime = System.currentTimeMillis();
     ByteBuffer buffer = serializeSchema(deviceIds, measurementIdsList);
+    System.out.println("Time cost is " + (System.currentTimeMillis() - 
startTime));
     System.out.println("Original Size is " + size);
     System.out.println("Serialized Size is " + buffer.remaining());
     List<String> decodedDeviceIds = new ArrayList<>();
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
index f4302227f92..f2d40dc0d7a 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
@@ -238,7 +238,7 @@ public interface IUnCompressor {
     public int uncompress(byte[] byteArray, int offset, int length, byte[] 
output, int outOffset)
         throws IOException {
       try {
-        return decompressor.decompress(byteArray, offset, length, output, 
offset);
+        return decompressor.decompress(byteArray, offset, length, output, 
outOffset);
       } catch (RuntimeException e) {
         logger.error(UNCOMPRESS_INPUT_ERROR, e);
         throw new IOException(e);

Reply via email to