This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch new-rpc-format-impl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 689ab4408a203f943f296c0db0def405c3437cb3 Author: Liu Xuxin <[email protected]> AuthorDate: Mon Mar 11 15:29:10 2024 +0800 pass schema serialize and deserialize test --- .../iotdb/session/req/InsertRecordsRequest.java | 86 +++++++++ .../apache/iotdb/session/util/SessionRPCUtils.java | 210 +++++++++++++++++++++ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 6 + .../thrift-datanode/src/main/thrift/client.thrift | 8 + 4 files changed, 310 insertions(+) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java new file mode 100644 index 00000000000..407859efa08 --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.req; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import java.util.List; + +public class InsertRecordsRequest { + private List<String> deviceIds; + private List<List<String>> measurementsIdsList; + private List<Long> timestamps; + private List<List<TSDataType>> dataTypesList; + private List<List<Object>> valuesList; + + public InsertRecordsRequest( + List<String> deviceIds, + List<List<String>> measurementsIdsList, + List<Long> timestamps, + List<List<TSDataType>> dataTypesList, + List<List<Object>> valuesList) { + this.deviceIds = deviceIds; + this.measurementsIdsList = measurementsIdsList; + this.timestamps = timestamps; + this.dataTypesList = dataTypesList; + this.valuesList = valuesList; + } + + public InsertRecordsRequest() {} + + public List<String> getDeviceIds() { + return deviceIds; + } + + public List<List<String>> getMeasurementsIdsList() { + return measurementsIdsList; + } + + public List<Long> getTimestamps() { + return timestamps; + } + + public List<List<TSDataType>> getDataTypesList() { + return dataTypesList; + } + + public List<List<Object>> getValuesList() { + return valuesList; + } + + public void setDeviceIds(List<String> deviceIds) { + this.deviceIds = deviceIds; + } + + public void setMeasurementsIdsList(List<List<String>> measurementsIdsList) { + this.measurementsIdsList = measurementsIdsList; + } + + public void setTimestamps(List<Long> timestamps) { + this.timestamps = timestamps; + } + + public void setDataTypesList(List<List<TSDataType>> dataTypesList) { + this.dataTypesList = dataTypesList; + } + + public void setValuesList(List<List<Object>> valuesList) { + this.valuesList = valuesList; + } +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java new file mode 100644 index 00000000000..5431b7dbb95 --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.util; + +import org.apache.iotdb.session.req.InsertRecordsRequest; +import org.apache.iotdb.tsfile.encoding.decoder.IntRleDecoder; +import org.apache.iotdb.tsfile.encoding.encoder.IntRleEncoder; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class SessionRPCUtils { + public static ByteBuffer[] serializeInsertRecordsReq( + List<String> deviceIds, + List<List<String>> measurementIdsList, + List<Long> timestamp, + List<List<TSDataType>> tpyesList, + List<List<Object>> valuesList) + throws IOException { + + return null; + } + + private static ByteBuffer serializeSchema( + List<String> deviceIds, List<List<String>> measurementIdsList) throws IOException { + // creating dictionary + Map<String, Integer> dictionary = getDictionary(deviceIds, measurementIdsList); + // serializing dictionary + PublicBAOS schemaBufferOS = new PublicBAOS(); + serializeDictionary(dictionary, schemaBufferOS); + dictionaryEncoding(dictionary, deviceIds, measurementIdsList, schemaBufferOS); + ByteBuffer schemaBuffer = ByteBuffer.allocate(schemaBufferOS.size()); + schemaBuffer.put(schemaBufferOS.getBuf(), 0, schemaBufferOS.size()); + schemaBuffer.flip(); + return schemaBuffer; + } + + private static void deserializeSchema( + ByteBuffer schemaBuffer, + List<String> decodedDeviceIds, + List<List<String>> decodedMeasurementIdsList) { + String[] dictionary = getDictionary(schemaBuffer); + IntRleDecoder decoder = new IntRleDecoder(); + deserializeDevices(schemaBuffer, decoder, dictionary, decodedDeviceIds); + deserializeMeasurementIds(schemaBuffer, decoder, dictionary, decodedMeasurementIdsList); + } + + private static String[] getDictionary(ByteBuffer schemaBuffer) { + int dictionarySize = ReadWriteIOUtils.readInt(schemaBuffer); + String[] dictionary = new String[dictionarySize]; + for (int i = 0; i < dictionarySize; ++i) { + dictionary[i] = ReadWriteIOUtils.readString(schemaBuffer); + } + return dictionary; + } + + private static void deserializeDevices( + ByteBuffer buffer, + IntRleDecoder decoder, + String[] dictionary, + List<String> decodedDeviceIds) { + int deviceSize = decoder.readInt(buffer); + for (int i = 0; i < deviceSize; ++i) { + StringBuilder builder = new StringBuilder(); + int wordSize = decoder.readInt(buffer); + for (int j = 0; j < wordSize; ++j) { + builder.append(dictionary[decoder.readInt(buffer)]); + if (j != wordSize - 1) { + builder.append("."); + } + } + decodedDeviceIds.add(builder.toString()); + } + } + + private static void deserializeMeasurementIds( + ByteBuffer buffer, + IntRleDecoder decoder, + String[] dictionary, + List<List<String>> decodedMeasurementIdsList) { + int listNum = decoder.readInt(buffer); + for (int i = 0; i < listNum; ++i) { + int currListSize = decoder.readInt(buffer); + List<String> currList = new ArrayList<>(currListSize); + for (int j = 0; j < currListSize; ++j) { + currList.add(dictionary[decoder.readInt(buffer)]); + } + decodedMeasurementIdsList.add(currList); + } + } + + private static Map<String, Integer> getDictionary( + List<String> deviceId, List<List<String>> measurementIdsList) { + Map<String, Integer> dictionary = new LinkedHashMap<>(); + AtomicInteger count = new AtomicInteger(0); + deviceId.forEach( + s -> { + Arrays.stream(s.split("\\.")) + .forEach( + word -> { + dictionary.computeIfAbsent(word, k -> count.getAndIncrement()); + }); + }); + measurementIdsList.forEach( + measurementIds -> { + measurementIds.forEach( + measurementId -> { + dictionary.computeIfAbsent(measurementId, k -> count.getAndIncrement()); + }); + }); + return dictionary; + } + + private static void serializeDictionary(Map<String, Integer> dictionary, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(dictionary.size(), buffer); + for (Map.Entry<String, Integer> entry : dictionary.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), buffer); + } + } + + private static void dictionaryEncoding( + Map<String, Integer> dictionary, + List<String> deviceIds, + List<List<String>> measurementIdsList, + PublicBAOS buffer) + throws IOException { + IntRleEncoder rleEncoder = new IntRleEncoder(); + rleEncoder.encode(deviceIds.size(), buffer); + for (String deviceId : deviceIds) { + String[] words = deviceId.split("\\."); + rleEncoder.encode(words.length, buffer); + Arrays.stream(words) + .forEach( + word -> { + rleEncoder.encode(dictionary.get(word), buffer); + }); + } + rleEncoder.encode(measurementIdsList.size(), buffer); + for (List<String> measurements : measurementIdsList) { + rleEncoder.encode(measurements.size(), buffer); + for (String measurement : measurements) { + rleEncoder.encode(dictionary.get(measurement), buffer); + } + } + rleEncoder.flush(buffer); + } + + public static InsertRecordsRequest deserializeInsertRecordsReq( + ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer auxiliaryBuffer) { + return null; + } + + public static void main(String[] args) throws Exception { + List<String> deviceIds = new ArrayList<>(); + List<List<String>> measurementIdsList = new ArrayList<>(); + List<String> measurementIds = new ArrayList<>(); + int size = 0; + int measurementSize = 0; + for (int i = 0; i < 100; ++i) { + String deviceId = "root.sg1.s10.d" + i; + size += deviceId.getBytes().length; + deviceIds.add("root.sg1.s10.d" + i); + measurementSize += ("s" + i).getBytes().length; + measurementIds.add("s" + i); + } + for (int i = 0; i < 100; ++i) { + measurementIdsList.add(measurementIds); + } + size += measurementSize * 100; + + ByteBuffer buffer = serializeSchema(deviceIds, measurementIdsList); + System.out.println("Original Size is " + size); + System.out.println("Serialized Size is " + buffer.remaining()); + List<String> decodedDeviceIds = new ArrayList<>(); + List<List<String>> decodedMeasurementIdsList = new ArrayList<>(); + deserializeSchema(buffer, decodedDeviceIds, decodedMeasurementIdsList); + if (deviceIds.equals(decodedDeviceIds) + && measurementIdsList.equals(decodedMeasurementIdsList)) { + System.out.println("Test passed"); + } else { + System.out.println("Test failed"); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 6d20d6f7d49..0983c700fec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -144,6 +144,7 @@ import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -1703,6 +1704,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } + @Override + public TSStatus insertRecordsV2(TSInsertRecordsReqV2 req) { + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + @Override public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) { long t1 = System.nanoTime(); diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index d96a04d7b21..22ec15b5151 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -255,6 +255,12 @@ struct TSInsertRecordsReq { 6: optional bool isAligned } +struct TSInsertRecordsReqV2 { + 1: required binary schemaBuffer + 2: required binary valueBuffer + 3: required binary auxiliaryBuffer +} + struct TSInsertRecordsOfOneDeviceReq { 1: required i64 sessionId 2: required string prefixPath @@ -596,6 +602,8 @@ service IClientRPCService { common.TSStatus insertRecords(1:TSInsertRecordsReq req); + common.TSStatus insertRecordsV2(2:TSInsertRecordsReqV2 req); + common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq req); common.TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);
