This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch new-rpc-format-impl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6a6fb2c2282755ea28f261c457de28e26c189c56 Author: Liu Xuxin <[email protected]> AuthorDate: Mon Mar 11 16:09:33 2024 +0800 add compression support --- .../org/apache/iotdb/isession/SessionConfig.java | 5 +++ .../apache/iotdb/session/util/SessionRPCUtils.java | 40 ++++++++++++++++++---- .../iotdb/tsfile/compress/IUnCompressor.java | 2 +- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java index ac14a99c80f..076120192fd 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java @@ -20,6 +20,7 @@ package org.apache.iotdb.isession; import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; public class SessionConfig { @@ -56,5 +57,9 @@ public class SessionConfig { public static final long RETRY_INTERVAL_IN_MS = 500; + public static boolean enableRPCCompression = true; + + public static CompressionType rpcCompressionType = CompressionType.GZIP; + private SessionConfig() {} } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java index 5431b7dbb95..bcba9566ab3 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java @@ -18,7 +18,10 @@ */ package org.apache.iotdb.session.util; +import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.session.req.InsertRecordsRequest; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.compress.IUnCompressor; import org.apache.iotdb.tsfile.encoding.decoder.IntRleDecoder; import org.apache.iotdb.tsfile.encoding.encoder.IntRleEncoder; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -54,8 +57,20 @@ public class SessionRPCUtils { PublicBAOS schemaBufferOS = new PublicBAOS(); serializeDictionary(dictionary, schemaBufferOS); dictionaryEncoding(dictionary, deviceIds, measurementIdsList, schemaBufferOS); - ByteBuffer schemaBuffer = ByteBuffer.allocate(schemaBufferOS.size()); - schemaBuffer.put(schemaBufferOS.getBuf(), 0, schemaBufferOS.size()); + ByteBuffer schemaBuffer = null; + if (SessionConfig.enableRPCCompression) { + ICompressor compressor = ICompressor.getCompressor(SessionConfig.rpcCompressionType); + byte[] compressedData = + compressor.compress(schemaBufferOS.getBuf(), 0, schemaBufferOS.size()); + schemaBuffer = ByteBuffer.allocate(compressedData.length + 5); + schemaBuffer.put((byte) 1); + ReadWriteIOUtils.write(schemaBufferOS.size(), schemaBuffer); + schemaBuffer.put(compressedData); + } else { + schemaBuffer = ByteBuffer.allocate(schemaBufferOS.size() + 1); + schemaBuffer.put((byte) 0); + schemaBuffer.put(schemaBufferOS.getBuf(), 0, schemaBufferOS.size()); + } schemaBuffer.flip(); return schemaBuffer; } @@ -63,11 +78,22 @@ public class SessionRPCUtils { private static void deserializeSchema( ByteBuffer schemaBuffer, List<String> decodedDeviceIds, - List<List<String>> decodedMeasurementIdsList) { - String[] dictionary = getDictionary(schemaBuffer); + List<List<String>> decodedMeasurementIdsList) + throws IOException { + boolean compressed = ReadWriteIOUtils.readBool(schemaBuffer); + ByteBuffer buffer = schemaBuffer; + if (compressed) { + int uncompressedLength = ReadWriteIOUtils.readInt(schemaBuffer); + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType); + byte[] uncompressed = new byte[uncompressedLength]; + unCompressor.uncompress(schemaBuffer.array(), 5, schemaBuffer.limit() - 5, uncompressed, 0); + buffer = ByteBuffer.wrap(uncompressed); + } + + String[] dictionary = getDictionary(buffer); IntRleDecoder decoder = new IntRleDecoder(); - deserializeDevices(schemaBuffer, decoder, dictionary, decodedDeviceIds); - deserializeMeasurementIds(schemaBuffer, decoder, dictionary, decodedMeasurementIdsList); + deserializeDevices(buffer, decoder, dictionary, decodedDeviceIds); + deserializeMeasurementIds(buffer, decoder, dictionary, decodedMeasurementIdsList); } private static String[] getDictionary(ByteBuffer schemaBuffer) { @@ -194,7 +220,9 @@ public class SessionRPCUtils { } size += measurementSize * 100; + long startTime = System.currentTimeMillis(); ByteBuffer buffer = serializeSchema(deviceIds, measurementIdsList); + System.out.println("Time cost is " + (System.currentTimeMillis() - startTime)); System.out.println("Original Size is " + size); System.out.println("Serialized Size is " + buffer.remaining()); List<String> decodedDeviceIds = new ArrayList<>(); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java index f4302227f92..f2d40dc0d7a 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java @@ -238,7 +238,7 @@ public interface IUnCompressor { public int uncompress(byte[] byteArray, int offset, int length, byte[] output, int outOffset) throws IOException { try { - return decompressor.decompress(byteArray, offset, length, output, offset); + return decompressor.decompress(byteArray, offset, length, output, outOffset); } catch (RuntimeException e) { logger.error(UNCOMPRESS_INPUT_ERROR, e); throw new IOException(e);
