This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch insert-records-compress in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5598debd89e5c5c819dca5e0525a5fefb8fe05c6 Author: Liu Xuxin <[email protected]> AuthorDate: Thu Jun 20 16:33:52 2024 +0800 Finish InsertRecordsReqSerializeUtils --- .../org/apache/iotdb/isession/SessionConfig.java | 6 + .../iotdb/isession/req/InsertRecordsRequest.java | 97 +++ .../util/InsertRecordsReqSerializeUtils.java | 661 +++++++++++++++++++++ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 6 + .../thrift-datanode/src/main/thrift/client.thrift | 9 + 5 files changed, 779 insertions(+) diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java index a095b5e0188..fc3c5fdc76b 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java @@ -21,6 +21,8 @@ package org.apache.iotdb.isession; import org.apache.iotdb.isession.util.Version; +import org.apache.tsfile.file.metadata.enums.CompressionType; + public class SessionConfig { public static final String DEFAULT_HOST = "localhost"; @@ -54,5 +56,9 @@ public class SessionConfig { public static final long RETRY_INTERVAL_IN_MS = 500; + public static boolean enableRPCCompression = true; + + public static CompressionType rpcCompressionType = CompressionType.GZIP; + private SessionConfig() {} } diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/req/InsertRecordsRequest.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/req/InsertRecordsRequest.java new file mode 100644 index 00000000000..415cd6c96b2 --- /dev/null +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/req/InsertRecordsRequest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.isession.req; + +import org.apache.tsfile.enums.TSDataType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class InsertRecordsRequest { + private List<String> deviceIds; + private List<List<String>> measurementsIdsList; + private List<Long> timestamps; + private List<List<TSDataType>> dataTypesList; + private List<List<Object>> valuesList; + private Map<String, Object> info; + + public InsertRecordsRequest( + List<String> deviceIds, + List<List<String>> measurementsIdsList, + List<Long> timestamps, + List<List<TSDataType>> dataTypesList, + List<List<Object>> valuesList, + Map<String, Object> info) { + this.deviceIds = deviceIds; + this.measurementsIdsList = measurementsIdsList; + this.timestamps = timestamps; + this.dataTypesList = dataTypesList; + this.valuesList = valuesList; + this.info = info; + } + + public InsertRecordsRequest() { + this.deviceIds = new ArrayList<>(); + this.measurementsIdsList = new ArrayList<>(); + this.timestamps = new ArrayList<>(); + this.dataTypesList = new ArrayList<>(); + this.valuesList = new ArrayList<>(); + } + + public List<String> getDeviceIds() { + return deviceIds; + } + + public List<List<String>> getMeasurementsIdsList() { + return measurementsIdsList; + } + + public List<Long> getTimestamps() { + return timestamps; + } + + public List<List<TSDataType>> getDataTypesList() { + return dataTypesList; + } + + public List<List<Object>> getValuesList() { + return valuesList; + } + + public void setDeviceIds(List<String> deviceIds) { + this.deviceIds = deviceIds; + } + + public void setMeasurementsIdsList(List<List<String>> measurementsIdsList) { + this.measurementsIdsList = measurementsIdsList; + } + + public void setTimestamps(List<Long> timestamps) { + this.timestamps = timestamps; + } + + public void setDataTypesList(List<List<TSDataType>> dataTypesList) { + this.dataTypesList = dataTypesList; + } + + public void setValuesList(List<List<Object>> valuesList) { + this.valuesList = valuesList; + } +} diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/util/InsertRecordsReqSerializeUtils.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/util/InsertRecordsReqSerializeUtils.java new file mode 100644 index 00000000000..74df744692c --- /dev/null +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/util/InsertRecordsReqSerializeUtils.java @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.isession.util; + +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.isession.req.InsertRecordsRequest; + +import org.apache.tsfile.compress.ICompressor; +import org.apache.tsfile.compress.IUnCompressor; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encoding.decoder.IntRleDecoder; +import org.apache.tsfile.encoding.encoder.Encoder; +import org.apache.tsfile.encoding.encoder.IntRleEncoder; +import org.apache.tsfile.encoding.encoder.TSEncodingBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class InsertRecordsReqSerializeUtils { + private static final Logger logger = LoggerFactory.getLogger(InsertRecordsReqSerializeUtils.class); + + /** + * Serialize InsertRecordsRequest to ByteBuffer array. + * + * @param deviceIds device ids + * @param measurementIdsList measurement ids list + * @param timestamps timestamps + * @param typesList data types list + * @param valuesList values list + * @param writeInfo write info + * @return ByteBuffer array, the first element is schema buffer, the second element is data + * buffer, the third element is info buffer + * @throws IOException if an I/O error occurs + */ + public static ByteBuffer[] serializeInsertRecordsReq( + List<String> deviceIds, + List<List<String>> measurementIdsList, + List<Long> timestamps, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList, + Map<String, Object> writeInfo) + throws IOException { + ByteBuffer schemaBuffer = serializeSchema(deviceIds, measurementIdsList); + ByteBuffer dataBuffer = serializeValue(timestamps, typesList, valuesList); + ByteBuffer infoBuffer = serializeInfo(writeInfo); + return new ByteBuffer[] {schemaBuffer, dataBuffer, infoBuffer}; + } + + /** + * Deserialize InsertRecordsRequest from ByteBuffer array. + * + * @param schemaBuffer schema buffer, containing device ids and measurement ids + * @param valueBuffer value buffer, containing timestamps, data types and values + * @param infoBuffer info buffer, containing write info + * @return deserialized InsertRecordsRequest + * @throws IOException if an I/O error occurs + */ + public static InsertRecordsRequest deserializeInsertRecordsReq( + ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer infoBuffer) throws IOException { + List<String> deviceIds = new ArrayList<>(); + List<Long> timestamps = new ArrayList<>(); + List<List<String>> measurementIds = new ArrayList<>(); + List<List<TSDataType>> dataTypes = new ArrayList<>(); + List<List<Object>> values = new ArrayList<>(); + deserializeSchema(schemaBuffer, deviceIds, measurementIds); + deserializeValueBuffer(valueBuffer, timestamps, dataTypes, values); + Map<String, Object> info = deserializeInfo(infoBuffer); + return new InsertRecordsRequest(deviceIds, measurementIds, timestamps, dataTypes, values, info); + } + + private static ByteBuffer serializeSchema( + List<String> deviceIds, List<List<String>> measurementIdsList) throws IOException { + // creating dictionary + Map<String, Integer> dictionary = getDictionary(deviceIds, measurementIdsList); + // serializing dictionary + PublicBAOS schemaBufferOS = new PublicBAOS(); + serializeDictionary(dictionary, schemaBufferOS); + dictionaryEncoding(dictionary, deviceIds, measurementIdsList, schemaBufferOS); + ByteBuffer schemaBuffer; + if (SessionConfig.enableRPCCompression) { + ICompressor compressor = ICompressor.getCompressor(SessionConfig.rpcCompressionType); + byte[] compressedData = + compressor.compress(schemaBufferOS.getBuf(), 0, schemaBufferOS.size()); + schemaBuffer = ByteBuffer.allocate(compressedData.length + 5); + schemaBuffer.put((byte) 1); + ReadWriteIOUtils.write(schemaBufferOS.size(), schemaBuffer); + schemaBuffer.put(compressedData); + } else { + schemaBuffer = ByteBuffer.allocate(schemaBufferOS.size() + 1); + schemaBuffer.put((byte) 0); + schemaBuffer.put(schemaBufferOS.getBuf(), 0, schemaBufferOS.size()); + } + schemaBuffer.flip(); + return schemaBuffer; + } + + private static void deserializeSchema( + ByteBuffer schemaBuffer, + List<String> decodedDeviceIds, + List<List<String>> decodedMeasurementIdsList) + throws IOException { + boolean compressed = ReadWriteIOUtils.readBool(schemaBuffer); + ByteBuffer buffer = schemaBuffer; + if (compressed) { + int uncompressedLength = ReadWriteIOUtils.readInt(schemaBuffer); + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType); + buffer = ByteBuffer.allocate(uncompressedLength); + unCompressor.uncompress(schemaBuffer, buffer); + buffer.flip(); + } + + String[] dictionary = getDictionary(buffer); + IntRleDecoder decoder = new IntRleDecoder(); + deserializeDevices(buffer, decoder, dictionary, decodedDeviceIds); + deserializeMeasurementIds(buffer, decoder, dictionary, decodedMeasurementIdsList); + } + + private static String[] getDictionary(ByteBuffer schemaBuffer) { + int dictionarySize = ReadWriteIOUtils.readInt(schemaBuffer); + String[] dictionary = new String[dictionarySize]; + for (int i = 0; i < dictionarySize; ++i) { + dictionary[i] = ReadWriteIOUtils.readString(schemaBuffer); + } + return dictionary; + } + + private static void deserializeDevices( + ByteBuffer buffer, + IntRleDecoder decoder, + String[] dictionary, + List<String> decodedDeviceIds) { + int deviceSize = decoder.readInt(buffer); + for (int i = 0; i < deviceSize; ++i) { + StringBuilder builder = new StringBuilder(); + int wordSize = decoder.readInt(buffer); + for (int j = 0; j < wordSize; ++j) { + builder.append(dictionary[decoder.readInt(buffer)]); + if (j != wordSize - 1) { + builder.append("."); + } + } + decodedDeviceIds.add(builder.toString()); + } + } + + private static void deserializeMeasurementIds( + ByteBuffer buffer, + IntRleDecoder decoder, + String[] dictionary, + List<List<String>> decodedMeasurementIdsList) { + int listNum = decoder.readInt(buffer); + for (int i = 0; i < listNum; ++i) { + int currListSize = decoder.readInt(buffer); + List<String> currList = new ArrayList<>(currListSize); + for (int j = 0; j < currListSize; ++j) { + currList.add(dictionary[decoder.readInt(buffer)]); + } + decodedMeasurementIdsList.add(currList); + } + } + + private static Map<String, Integer> getDictionary( + List<String> deviceId, List<List<String>> measurementIdsList) { + Map<String, Integer> dictionary = new LinkedHashMap<>(); + AtomicInteger count = new AtomicInteger(0); + deviceId.forEach( + s -> { + Arrays.stream(s.split("\\.")) + .forEach( + word -> { + dictionary.computeIfAbsent(word, k -> count.getAndIncrement()); + }); + }); + measurementIdsList.forEach( + measurementIds -> { + measurementIds.forEach( + measurementId -> { + dictionary.computeIfAbsent(measurementId, k -> count.getAndIncrement()); + }); + }); + return dictionary; + } + + private static void serializeDictionary(Map<String, Integer> dictionary, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(dictionary.size(), buffer); + for (Map.Entry<String, Integer> entry : dictionary.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), buffer); + } + } + + private static void dictionaryEncoding( + Map<String, Integer> dictionary, + List<String> deviceIds, + List<List<String>> measurementIdsList, + PublicBAOS buffer) + throws IOException { + IntRleEncoder rleEncoder = new IntRleEncoder(); + rleEncoder.encode(deviceIds.size(), buffer); + for (String deviceId : deviceIds) { + String[] words = deviceId.split("\\."); + rleEncoder.encode(words.length, buffer); + Arrays.stream(words) + .forEach( + word -> { + rleEncoder.encode(dictionary.get(word), buffer); + }); + } + rleEncoder.encode(measurementIdsList.size(), buffer); + for (List<String> measurements : measurementIdsList) { + rleEncoder.encode(measurements.size(), buffer); + for (String measurement : measurements) { + rleEncoder.encode(dictionary.get(measurement), buffer); + } + } + rleEncoder.flush(buffer); + } + + private static ByteBuffer serializeValue( + List<Long> timestamps, List<List<TSDataType>> typesList, List<List<Object>> valuesList) + throws IOException { + PublicBAOS valueSerializeBuffer = new PublicBAOS(); + serializeTimestamps(timestamps, valueSerializeBuffer); + serializeTypesAndValues(typesList, valuesList, valueSerializeBuffer); + ByteBuffer outBuffer = null; + if (SessionConfig.enableRPCCompression) { + ICompressor compressor = ICompressor.getCompressor(SessionConfig.rpcCompressionType); + byte[] compressed = + compressor.compress(valueSerializeBuffer.getBuf(), 0, valueSerializeBuffer.size()); + outBuffer = ByteBuffer.allocate(compressed.length + 5); + outBuffer.put((byte) 1); + ReadWriteIOUtils.write(valueSerializeBuffer.size(), outBuffer); + outBuffer.put(compressed); + } else { + outBuffer = ByteBuffer.allocate(valueSerializeBuffer.size() + 1); + outBuffer.put((byte) 0); + outBuffer.put(valueSerializeBuffer.getBuf(), 0, valueSerializeBuffer.size()); + } + outBuffer.flip(); + return outBuffer; + } + + private static void serializeTimestamps(List<Long> timestamps, PublicBAOS valueBuffer) + throws IOException { + Encoder timeEncoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.INT64); + PublicBAOS timeBuffer = new PublicBAOS(); + for (long timestamp : timestamps) { + timeEncoder.encode(timestamp, timeBuffer); + } + timeEncoder.flush(timeBuffer); + valueBuffer.write(timeBuffer.getBuf(), 0, timeBuffer.size()); + } + + private static void serializeTypesAndValues( + List<List<TSDataType>> typesList, List<List<Object>> valuesList, PublicBAOS buffer) + throws IOException { + int recordCount = typesList.size(); + ReadWriteIOUtils.write(recordCount, buffer); + List<Integer> intList = new ArrayList<>(); + List<Long> longList = new ArrayList<>(); + List<Float> floatList = new ArrayList<>(); + List<Double> doubleList = new ArrayList<>(); + List<Boolean> booleanList = new ArrayList<>(); + List<String> stringList = new ArrayList<>(); + PublicBAOS typeBuffer = new PublicBAOS(); + for (int i = 0; i < recordCount; ++i) { + List<TSDataType> types = typesList.get(i); + List<Object> values = valuesList.get(i); + int size = types.size(); + ReadWriteIOUtils.write(size, buffer); + for (int j = 0; j < size; ++j) { + switch (types.get(j)) { + case INT32: + intList.add((Integer) values.get(j)); + break; + case INT64: + longList.add((Long) values.get(j)); + break; + case FLOAT: + floatList.add((Float) values.get(j)); + break; + case DOUBLE: + doubleList.add((Double) values.get(j)); + break; + case BOOLEAN: + booleanList.add((Boolean) values.get(j)); + break; + case TEXT: + stringList.add((String) values.get(j)); + break; + default: + throw new IOException("Unsupported data type " + types.get(j)); + } + ReadWriteIOUtils.write(types.get(j), typeBuffer); + } + } + serializeIntList(intList, buffer); + serializeLongList(longList, buffer); + serializeFloatList(floatList, buffer); + serializeDoubleList(doubleList, buffer); + serializeBooleanList(booleanList, buffer); + serializeStringList(stringList, buffer); + buffer.write(typeBuffer.getBuf(), 0, typeBuffer.size()); + } + + private static void serializeIntList(List<Integer> values, PublicBAOS buffer) throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.INT32); + PublicBAOS intBuffer = new PublicBAOS(); + for (int value : values) { + encoder.encode(value, intBuffer); + } + encoder.flush(intBuffer); + buffer.write(intBuffer.getBuf(), 0, intBuffer.size()); + } + + private static void serializeLongList(List<Long> values, PublicBAOS buffer) throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.INT64); + PublicBAOS longBuffer = new PublicBAOS(); + for (long value : values) { + encoder.encode(value, longBuffer); + } + encoder.flush(longBuffer); + buffer.write(longBuffer.getBuf(), 0, longBuffer.size()); + } + + private static void serializeFloatList(List<Float> values, PublicBAOS buffer) throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.FLOAT); + PublicBAOS floatBuffer = new PublicBAOS(); + for (float value : values) { + encoder.encode(value, floatBuffer); + } + encoder.flush(floatBuffer); + buffer.write(floatBuffer.getBuf(), 0, floatBuffer.size()); + } + + private static void serializeDoubleList(List<Double> values, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.DOUBLE); + PublicBAOS doubleBuffer = new PublicBAOS(); + for (double value : values) { + encoder.encode(value, doubleBuffer); + } + encoder.flush(doubleBuffer); + buffer.write(doubleBuffer.getBuf(), 0, doubleBuffer.size()); + } + + private static void serializeBooleanList(List<Boolean> values, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + final int bitsPerByte = 8; + + int size = (values.size() + bitsPerByte - 1) / bitsPerByte; + byte[] byteArray = new byte[size]; + + for (int i = 0; i < values.size(); i++) { + if (values.get(i)) { + int arrayIndex = i / bitsPerByte; + int bitPosition = i % bitsPerByte; + byteArray[arrayIndex] |= (byte) (1 << bitPosition); + } + } + + ReadWriteIOUtils.write(size, buffer); + buffer.write(byteArray); + } + + private static void serializeStringList(List<String> values, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.DICTIONARY).getEncoder(TSDataType.TEXT); + PublicBAOS stringBuffer = new PublicBAOS(); + for (String value : values) { + encoder.encode(new Binary(value.getBytes()), stringBuffer); + } + encoder.flush(stringBuffer); + buffer.write(stringBuffer.getBuf(), 0, stringBuffer.size()); + } + + private static void deserializeValueBuffer( + ByteBuffer buffer, + List<Long> outputTimestamps, + List<List<TSDataType>> outputTypesList, + List<List<Object>> outputValuesList) + throws IOException { + boolean compressed = ReadWriteIOUtils.readBool(buffer); + ByteBuffer dataBuffer = buffer; + if (compressed) { + int uncompressedSize = ReadWriteIOUtils.readInt(buffer); + dataBuffer = ByteBuffer.allocate(uncompressedSize); + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType); + unCompressor.uncompress(buffer, dataBuffer); + dataBuffer.flip(); + } + deserializeTime(dataBuffer, outputTimestamps); + deserializeTypesAndValues(dataBuffer, outputTypesList, outputValuesList); + } + + private static void deserializeTime(ByteBuffer buffer, List<Long> timestamps) throws IOException { + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.INT64); + while (decoder.hasNext(buffer)) { + timestamps.add(decoder.readLong(buffer)); + } + } + + private static void deserializeTypesAndValues( + ByteBuffer buffer, List<List<TSDataType>> typesList, List<List<Object>> valuesList) + throws IOException { + int recordCount = ReadWriteIOUtils.readInt(buffer); + int[] recordSizeArray = new int[recordCount]; + for (int i = 0; i < recordCount; ++i) { + recordSizeArray[i] = ReadWriteIOUtils.readInt(buffer); + } + int[] intArray = deserializeIntList(buffer); + long[] longArray = deserializeLongList(buffer); + float[] floatArray = deserializeFloatList(buffer); + double[] doubleArray = deserializeDoubleList(buffer); + boolean[] booleanArray = deserializeBooleanList(buffer); + Binary[] stringArray = deserializeStringList(buffer); + int[] indexes = new int[6]; + for (int i = 0; i < recordCount; ++i) { + List<TSDataType> types = new ArrayList<>(recordSizeArray[i]); + List<Object> values = new ArrayList<>(recordSizeArray[i]); + for (int j = 0; j < recordSizeArray[i]; ++j) { + byte b = buffer.get(); + types.add(TSDataType.deserialize(b)); + switch (types.get(j)) { + case INT32: + values.add(intArray[indexes[0]++]); + break; + case INT64: + values.add(longArray[indexes[1]++]); + break; + case FLOAT: + values.add(floatArray[indexes[2]++]); + break; + case DOUBLE: + values.add(doubleArray[indexes[3]++]); + break; + case BOOLEAN: + values.add(booleanArray[indexes[4]++]); + break; + case TEXT: + values.add(stringArray[indexes[5]++]); + break; + default: + throw new IOException("Unsupported data type " + types.get(j)); + } + } + typesList.add(types); + valuesList.add(values); + } + } + + private static int[] deserializeIntList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new int[0]; + } + int[] intArray = new int[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.INT32); + for (int i = 0; i < size; ++i) { + intArray[i] = decoder.readInt(buffer); + } + if (decoder.hasNext(buffer)) { + System.out.println("ERROR"); + } + return intArray; + } + + private static long[] deserializeLongList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new long[0]; + } + long[] longArray = new long[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.INT64); + for (int i = 0; i < size; ++i) { + longArray[i] = decoder.readLong(buffer); + } + return longArray; + } + + private static float[] deserializeFloatList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new float[0]; + } + float[] floatArray = new float[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.FLOAT); + for (int i = 0; i < size; ++i) { + floatArray[i] = decoder.readFloat(buffer); + } + return floatArray; + } + + private static double[] deserializeDoubleList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new double[0]; + } + double[] doubleArray = new double[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.GORILLA, TSDataType.DOUBLE); + for (int i = 0; i < size; ++i) { + doubleArray[i] = decoder.readDouble(buffer); + } + return doubleArray; + } + + private static boolean[] deserializeBooleanList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new boolean[0]; + } + int byteSize = ReadWriteIOUtils.readInt(buffer); + byte[] byteArray = new byte[byteSize]; + buffer.get(byteArray); + boolean[] booleanArray = new boolean[size]; + for (int i = 0; i < size; i++) { + int arrayIndex = i / 8; + int bitPosition = i % 8; + booleanArray[i] = (byteArray[arrayIndex] & (1 << bitPosition)) != 0; + } + return booleanArray; + } + + private static Binary[] deserializeStringList(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + if (size == 0) { + return new Binary[0]; + } + Binary[] stringArray = new Binary[size]; + Decoder decoder = Decoder.getDecoderByType(TSEncoding.DICTIONARY, TSDataType.TEXT); + for (int i = 0; i < size; ++i) { + stringArray[i] = decoder.readBinary(buffer); + } + return stringArray; + } + + private static ByteBuffer serializeInfo(Map<String, Object> info) throws IOException { + PublicBAOS baos = new PublicBAOS(); + ReadWriteIOUtils.write(info.size(), baos); + for (Map.Entry<String, Object> entry : info.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), baos); + Object value = entry.getValue(); + if (value instanceof Integer) { + ReadWriteIOUtils.write(TSDataType.INT32.getType(), baos); + ReadWriteIOUtils.write((Integer) value, baos); + } else if (value instanceof Long) { + ReadWriteIOUtils.write(TSDataType.INT64.getType(), baos); + ReadWriteIOUtils.write((Long) value, baos); + } else if (value instanceof Float) { + ReadWriteIOUtils.write(TSDataType.FLOAT.getType(), baos); + ReadWriteIOUtils.write((Float) value, baos); + } else if (value instanceof Double) { + ReadWriteIOUtils.write(TSDataType.DOUBLE.getType(), baos); + ReadWriteIOUtils.write((Double) value, baos); + } else if (value instanceof Boolean) { + ReadWriteIOUtils.write(TSDataType.BOOLEAN.getType(), baos); + ReadWriteIOUtils.write((Boolean) value, baos); + } else if (value instanceof String) { + ReadWriteIOUtils.write(TSDataType.TEXT.getType(), baos); + ReadWriteIOUtils.write((String) value, baos); + } else { + throw new IOException("Unsupported data type " + value.getClass()); + } + } + ByteBuffer buffer = ByteBuffer.allocate(baos.size()); + buffer.put(baos.getBuf(), 0, baos.size()); + buffer.flip(); + return buffer; + } + + private static Map<String, Object> deserializeInfo(ByteBuffer buffer) throws IOException { + int size = ReadWriteIOUtils.readInt(buffer); + Map<String, Object> info = new LinkedHashMap<>(); + for (int i = 0; i < size; ++i) { + String key = ReadWriteIOUtils.readString(buffer); + TSDataType type = TSDataType.deserialize(buffer.get()); + switch (type) { + case INT32: + info.put(key, ReadWriteIOUtils.readInt(buffer)); + break; + case INT64: + info.put(key, ReadWriteIOUtils.readLong(buffer)); + break; + case FLOAT: + info.put(key, ReadWriteIOUtils.readFloat(buffer)); + break; + case DOUBLE: + info.put(key, ReadWriteIOUtils.readDouble(buffer)); + break; + case BOOLEAN: + info.put(key, ReadWriteIOUtils.readBool(buffer)); + break; + case TEXT: + info.put(key, ReadWriteIOUtils.readString(buffer)); + break; + default: + throw new IOException("Unsupported data type " + type); + } + } + return info; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 81f1bc105f3..9efd4cd5ba3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -133,6 +133,7 @@ import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp; import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSCompressedInsertRecordsReq; import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp; import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; @@ -1731,6 +1732,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } + @Override + public TSStatus insertCompressedRecords(TSCompressedInsertRecordsReq req) { + return null; + } + @Override public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) { long t1 = System.nanoTime(); diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 917a9a57900..faeba5bcdbb 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -255,6 +255,13 @@ struct TSInsertRecordsReq { 6: optional bool isAligned } +struct TSCompressedInsertRecordsReq { + 1: required binary schemaBuffer + 2: required binary valueBuffer + 3: required binary auxiliaryBuffer + 4: optional i64 sessionId +} + struct TSInsertRecordsOfOneDeviceReq { 1: required i64 sessionId 2: required string prefixPath @@ -596,6 +603,8 @@ service IClientRPCService { common.TSStatus insertRecords(1:TSInsertRecordsReq req); + common.TSStatus insertCompressedRecords(1:TSCompressedInsertRecordsReq req); + common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq req); common.TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);
