This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch new-rpc-format-impl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 034a54fffa782337faefb95e4bd7cde83568cc72 Author: Liu Xuxin <[email protected]> AuthorDate: Mon Mar 11 23:23:34 2024 +0800 finish --- .../apache/iotdb/session/util/SessionRPCUtils.java | 227 +++++++++++++++++---- .../iotdb/session/util/ValueBufferBuilder.java | 4 +- 2 files changed, 189 insertions(+), 42 deletions(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java index ad207be780d..cf9b344dd8b 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java @@ -22,6 +22,7 @@ import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.session.req.InsertRecordsRequest; import org.apache.iotdb.tsfile.compress.ICompressor; import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; import org.apache.iotdb.tsfile.encoding.decoder.IntRleDecoder; import org.apache.iotdb.tsfile.encoding.encoder.Encoder; import org.apache.iotdb.tsfile.encoding.encoder.IntRleEncoder; @@ -30,19 +31,16 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; -import org.apache.iotdb.tsfile.utils.RamUsageEstimator; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; public class SessionRPCUtils { @@ -253,54 +251,45 @@ public class SessionRPCUtils { List<Double> doubleList = new ArrayList<>(); List<Boolean> booleanList = new ArrayList<>(); List<String> stringList = new ArrayList<>(); - Encoder encoder = - TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN).getEncoder(TSDataType.INT32); + PublicBAOS typeBuffer = new PublicBAOS(); for (int i = 0; i < recordCount; ++i) { List<TSDataType> types = typesList.get(i); List<Object> values = valuesList.get(i); int size = types.size(); - encoder.encode(size, buffer); + ReadWriteIOUtils.write(size, buffer); for (int j = 0; j < size; ++j) { - int offset = -1; switch (types.get(j)) { case INT32: - offset = intList.size(); intList.add((Integer) values.get(j)); break; case INT64: - offset = longList.size(); longList.add((Long) values.get(j)); break; case FLOAT: - offset = floatList.size(); floatList.add((Float) values.get(j)); break; case DOUBLE: - offset = doubleList.size(); doubleList.add((Double) values.get(j)); break; case BOOLEAN: - offset = booleanList.size(); booleanList.add((Boolean) values.get(j)); break; case TEXT: - offset = stringList.size(); stringList.add((String) values.get(j)); break; default: throw new IOException("Unsupported data type " + types.get(j)); } - encoder.encode((int) (types.get(j).getType()), buffer); - encoder.encode(offset, buffer); + ReadWriteIOUtils.write(types.get(j), typeBuffer); } } - encoder.flush(buffer); serializeIntList(intList, buffer); serializeLongList(longList, buffer); serializeFloatList(floatList, buffer); serializeDoubleList(doubleList, buffer); serializeBooleanList(booleanList, buffer); serializeStringList(stringList, buffer); + buffer.write(typeBuffer.getBuf(), 0, typeBuffer.size()); } private static void serializeIntList(List<Integer> values, PublicBAOS buffer) throws IOException { @@ -315,7 +304,6 @@ public class SessionRPCUtils { encoder.encode(value, intBuffer); } encoder.flush(intBuffer); - ReadWriteIOUtils.write(intBuffer.getBuf().length, buffer); buffer.write(intBuffer.getBuf(), 0, intBuffer.size()); } @@ -331,7 +319,6 @@ public class SessionRPCUtils { encoder.encode(value, longBuffer); } encoder.flush(longBuffer); - ReadWriteIOUtils.write(longBuffer.getBuf().length, buffer); buffer.write(longBuffer.getBuf(), 0, longBuffer.size()); } @@ -347,7 +334,6 @@ public class SessionRPCUtils { encoder.encode(value, floatBuffer); } encoder.flush(floatBuffer); - ReadWriteIOUtils.write(floatBuffer.getBuf().length, buffer); buffer.write(floatBuffer.getBuf(), 0, floatBuffer.size()); } @@ -364,7 +350,6 @@ public class SessionRPCUtils { encoder.encode(value, doubleBuffer); } encoder.flush(doubleBuffer); - ReadWriteIOUtils.write(doubleBuffer.getBuf().length, buffer); buffer.write(doubleBuffer.getBuf(), 0, doubleBuffer.size()); } @@ -404,10 +389,169 @@ public class SessionRPCUtils { encoder.encode(new Binary(value.getBytes()), stringBuffer); } encoder.flush(stringBuffer); - ReadWriteIOUtils.write(stringBuffer.getBuf().length, buffer); buffer.write(stringBuffer.getBuf(), 0, stringBuffer.size()); } + private static void deserializeValueBuffer( + ByteBuffer buffer, + List<Long> outputTimestamps, + List<List<TSDataType>> outputTypesList, + List<List<Object>> outputValuesList) + throws IOException { + boolean compressed = ReadWriteIOUtils.readBool(buffer); + ByteBuffer dataBuffer = buffer; + if (compressed) { + int uncompressedSize = ReadWriteIOUtils.readInt(buffer); + byte[] uncompressed = new byte[uncompressedSize]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType); + unCompressor.uncompress(buffer.array(), 5, buffer.limit() - 5, uncompressed, 0); + dataBuffer = ByteBuffer.wrap(uncompressed); + } + deserializeTime(dataBuffer, outputTimestamps); + deserializeTypesAndValues(dataBuffer, outputTypesList, outputValuesList); + } + + private static void deserializeTime(ByteBuffer buffer, List<Long> timestamps) throws IOException { + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.INT64); + while (decoder.hasNext(buffer)) { + timestamps.add(decoder.readLong(buffer)); + } + } + + private static void deserializeTypesAndValues( + ByteBuffer buffer, List<List<TSDataType>> typesList, List<List<Object>> valuesList) + throws IOException { + int recordCount = ReadWriteIOUtils.readInt(buffer); + int[] recordSizeArray = new int[recordCount]; + for (int i = 0; i < recordCount; ++i) { + recordSizeArray[i] = ReadWriteIOUtils.readInt(buffer); + } + int[] intArray = deserializeIntList(buffer); + long[] longArray = deserializeLongList(buffer); + float[] floatArray = deserializeFloatList(buffer); + double[] doubleArray = deserializeDoubleList(buffer); + boolean[] booleanArray = deserializeBooleanList(buffer); + String[] stringArray = deserializeStringList(buffer); + int[] indexes = new int[6]; + for (int i = 0; i < recordCount; ++i) { + List<TSDataType> types = new ArrayList<>(recordSizeArray[i]); + List<Object> values = new ArrayList<>(recordSizeArray[i]); + for (int j = 0; j < recordSizeArray[i]; ++j) { + byte b = buffer.get(); + types.add(TSDataType.deserialize(b)); + switch (types.get(j)) { + case INT32: + values.add(intArray[indexes[0]++]); + break; + case INT64: + values.add(longArray[indexes[1]++]); + break; + case FLOAT: + values.add(floatArray[indexes[2]++]); + break; + case DOUBLE: + values.add(doubleArray[indexes[3]++]); + break; + case BOOLEAN: + values.add(booleanArray[indexes[4]++]); + break; + case TEXT: + values.add(stringArray[indexes[5]++]); + break; + default: + throw new IOException("Unsupported data type " + types.get(j)); + } + } + typesList.add(types); + valuesList.add(values); + } + } + + private static int[] deserializeIntList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new int[0]; + } + int[] intArray = new int[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.INT32); + for (int i = 0; i < size; ++i) { + intArray[i] = decoder.readInt(buffer); + } + if (decoder.hasNext(buffer)) { + System.out.println("ERROR"); + } + return intArray; + } + + private static long[] deserializeLongList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new long[0]; + } + long[] longArray = new long[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.INT64); + for (int i = 0; i < size; ++i) { + longArray[i] = decoder.readLong(buffer); + } + return longArray; + } + + private static float[] deserializeFloatList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new float[0]; + } + float[] floatArray = new float[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.FLOAT); + for (int i = 0; i < size; ++i) { + floatArray[i] = decoder.readFloat(buffer); + } + return floatArray; + } + + private static double[] deserializeDoubleList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new double[0]; + } + double[] doubleArray = new double[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.DOUBLE); + for (int i = 0; i < size; ++i) { + doubleArray[i] = decoder.readDouble(buffer); + } + return doubleArray; + } + + private static boolean[] deserializeBooleanList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new boolean[0]; + } + int byteSize = ReadWriteIOUtils.readInt(buffer); + byte[] byteArray = new byte[byteSize]; + buffer.get(byteArray); + boolean[] booleanArray = new boolean[size]; + for (int i = 0; i < size; i++) { + int arrayIndex = i / 8; + int bitPosition = i % 8; + booleanArray[i] = (byteArray[arrayIndex] & (1 << bitPosition)) != 0; + } + return booleanArray; + } + + private static String[] deserializeStringList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new String[0]; + } + String[] stringArray = new String[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.DICTIONARY, TSDataType.TEXT); + for (int i = 0; i < size; ++i) { + stringArray[i] = decoder.readBinary(buffer).toString(); + } + return stringArray; + } + public static InsertRecordsRequest deserializeInsertRecordsReq( ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer auxiliaryBuffer) { return null; @@ -428,55 +572,60 @@ public class SessionRPCUtils { switch (((byte) j % 6)) { case 0: types.add(TSDataType.BOOLEAN); - value.add(true); + value.add(new Random().nextBoolean()); singleRowSize += 2; break; case 1: types.add(TSDataType.INT32); - value.add(1); + value.add(new Random().nextInt()); singleRowSize += 5; break; case 2: types.add(TSDataType.INT64); - value.add(1L); + value.add(new Random().nextLong()); singleRowSize += 9; break; case 3: types.add(TSDataType.FLOAT); - value.add(1.0f); + value.add(new Random().nextFloat()); singleRowSize += 5; break; case 4: types.add(TSDataType.DOUBLE); - value.add(1.0); + value.add(new Random().nextDouble()); singleRowSize += 9; break; case 5: types.add(TSDataType.TEXT); - value.add("123"); + value.add(String.valueOf(new Random().nextInt(100))); singleRowSize += 7; break; } } singleRowSize += 8; - int totalSize = singleRowSize * 1000; - for (int i = 0; i < 1000; ++i) { + int totalSize = singleRowSize * 10000; + for (int i = 0; i < 10000; ++i) { timestamps.add(System.currentTimeMillis()); measurements.add(types); values.add(value); } - PublicBAOS baos = new PublicBAOS(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(values); - // oos.writeObject(measurements); - // oos.writeObject(timestamps); - oos.flush(); - oos.close(); + long startTime = System.currentTimeMillis(); ByteBuffer buffer = serializeValue(timestamps, measurements, values); + System.out.println("Serialize time cost is " + (System.currentTimeMillis() - startTime)); System.out.println("Original Size is " + totalSize); System.out.println("Serialized Size is " + buffer.limit()); - System.out.println("Java serialize size is " + baos.size()); - System.out.println(); + System.out.println("Compression rate is " + (double) totalSize / buffer.limit()); + List<Long> decodedTime = new ArrayList<>(); + List<List<TSDataType>> decodedTypes = new ArrayList<>(); + List<List<Object>> decodedValues = new ArrayList<>(); + startTime = System.currentTimeMillis(); + deserializeValueBuffer(buffer, decodedTime, decodedTypes, decodedValues); + System.out.println("Time cost is " + (System.currentTimeMillis() - startTime)); + if (!decodedTime.equals(timestamps) + || !decodedTypes.equals(measurements) + || !decodedValues.equals(values)) { + System.out.println("ERROR"); + } } private static void testSchema() throws IOException { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java index fe02c4b529b..ada9c409112 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java @@ -18,6 +18,4 @@ */ package org.apache.iotdb.session.util; -public class ValueBufferBuilder { - -} +public class ValueBufferBuilder {}
