This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch new-rpc-format-impl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 503beb0733a713c9349dcba7679589fff03c141e Author: Liu Xuxin <[email protected]> AuthorDate: Mon Mar 11 17:24:31 2024 +0800 temp --- .../apache/iotdb/session/util/SessionRPCUtils.java | 277 +++++++++++++++++++++ .../org/apache/iotdb/session/util/TestSchema.java | 26 ++ .../org/apache/iotdb/session/util/TestValue.java | 30 +++ .../iotdb/session/util/ValueBufferBuilder.java | 23 ++ 4 files changed, 356 insertions(+) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java index bcba9566ab3..ad207be780d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java @@ -23,12 +23,20 @@ import org.apache.iotdb.session.req.InsertRecordsRequest; import org.apache.iotdb.tsfile.compress.ICompressor; import org.apache.iotdb.tsfile.compress.IUnCompressor; import org.apache.iotdb.tsfile.encoding.decoder.IntRleDecoder; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; import org.apache.iotdb.tsfile.encoding.encoder.IntRleEncoder; +import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.RamUsageEstimator; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -45,6 +53,7 @@ public class SessionRPCUtils { List<List<TSDataType>> tpyesList, List<List<Object>> valuesList) throws IOException { + ByteBuffer schemaBuffer = serializeSchema(deviceIds, measurementIdsList); return null; } @@ -197,12 +206,280 @@ public class SessionRPCUtils { rleEncoder.flush(buffer); } + private static ByteBuffer serializeValue( + List<Long> timestamps, List<List<TSDataType>> typesList, List<List<Object>> valuesList) + throws IOException { + PublicBAOS valueSerializeBuffer = new PublicBAOS(); + serializeTimestamps(timestamps, valueSerializeBuffer); + serializeTypesAndValues(typesList, valuesList, valueSerializeBuffer); + ByteBuffer outBuffer = null; + if (SessionConfig.enableRPCCompression) { + ICompressor compressor = ICompressor.getCompressor(SessionConfig.rpcCompressionType); + byte[] compressed = + compressor.compress(valueSerializeBuffer.getBuf(), 0, valueSerializeBuffer.size()); + outBuffer = ByteBuffer.allocate(compressed.length + 5); + outBuffer.put((byte) 1); + ReadWriteIOUtils.write(valueSerializeBuffer.size(), outBuffer); + outBuffer.put(compressed); + } else { + outBuffer = ByteBuffer.allocate(valueSerializeBuffer.size() + 1); + outBuffer.put((byte) 0); + outBuffer.put(valueSerializeBuffer.getBuf(), 0, valueSerializeBuffer.size()); + } + outBuffer.flip(); + return outBuffer; + } + + private static void serializeTimestamps(List<Long> timestamps, PublicBAOS valueBuffer) + throws IOException { + Encoder timeEncoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.INT64); + PublicBAOS timeBuffer = new PublicBAOS(); + for (long timestamp : timestamps) { + timeEncoder.encode(timestamp, timeBuffer); + } + timeEncoder.flush(timeBuffer); + valueBuffer.write(timeBuffer.getBuf(), 0, timeBuffer.size()); + } + + private static void serializeTypesAndValues( + List<List<TSDataType>> typesList, List<List<Object>> valuesList, PublicBAOS buffer) + throws IOException { + int recordCount = typesList.size(); + ReadWriteIOUtils.write(recordCount, buffer); + List<Integer> intList = new ArrayList<>(); + List<Long> longList = new ArrayList<>(); + List<Float> floatList = new ArrayList<>(); + List<Double> doubleList = new ArrayList<>(); + List<Boolean> booleanList = new ArrayList<>(); + List<String> stringList = new ArrayList<>(); + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN).getEncoder(TSDataType.INT32); + for (int i = 0; i < recordCount; ++i) { + List<TSDataType> types = typesList.get(i); + List<Object> values = valuesList.get(i); + int size = types.size(); + encoder.encode(size, buffer); + for (int j = 0; j < size; ++j) { + int offset = -1; + switch (types.get(j)) { + case INT32: + offset = intList.size(); + intList.add((Integer) values.get(j)); + break; + case INT64: + offset = longList.size(); + longList.add((Long) values.get(j)); + break; + case FLOAT: + offset = floatList.size(); + floatList.add((Float) values.get(j)); + break; + case DOUBLE: + offset = doubleList.size(); + doubleList.add((Double) values.get(j)); + break; + case BOOLEAN: + offset = booleanList.size(); + booleanList.add((Boolean) values.get(j)); + break; + case TEXT: + offset = stringList.size(); + stringList.add((String) values.get(j)); + break; + default: + throw new IOException("Unsupported data type " + types.get(j)); + } + encoder.encode((int) (types.get(j).getType()), buffer); + encoder.encode(offset, buffer); + } + } + encoder.flush(buffer); + serializeIntList(intList, buffer); + serializeLongList(longList, buffer); + serializeFloatList(floatList, buffer); + serializeDoubleList(doubleList, buffer); + serializeBooleanList(booleanList, buffer); + serializeStringList(stringList, buffer); + } + + private static void serializeIntList(List<Integer> values, PublicBAOS buffer) throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.INT32); + PublicBAOS intBuffer = new PublicBAOS(); + for (int value : values) { + encoder.encode(value, intBuffer); + } + encoder.flush(intBuffer); + ReadWriteIOUtils.write(intBuffer.getBuf().length, buffer); + buffer.write(intBuffer.getBuf(), 0, intBuffer.size()); + } + + private static void serializeLongList(List<Long> values, PublicBAOS buffer) throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.INT64); + PublicBAOS longBuffer = new PublicBAOS(); + for (long value : values) { + encoder.encode(value, longBuffer); + } + encoder.flush(longBuffer); + ReadWriteIOUtils.write(longBuffer.getBuf().length, buffer); + buffer.write(longBuffer.getBuf(), 0, longBuffer.size()); + } + + private static void serializeFloatList(List<Float> values, PublicBAOS buffer) throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.FLOAT); + PublicBAOS floatBuffer = new PublicBAOS(); + for (float value : values) { + encoder.encode(value, floatBuffer); + } + encoder.flush(floatBuffer); + ReadWriteIOUtils.write(floatBuffer.getBuf().length, buffer); + buffer.write(floatBuffer.getBuf(), 0, floatBuffer.size()); + } + + private static void serializeDoubleList(List<Double> values, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.GORILLA).getEncoder(TSDataType.DOUBLE); + PublicBAOS doubleBuffer = new PublicBAOS(); + for (double value : values) { + encoder.encode(value, doubleBuffer); + } + encoder.flush(doubleBuffer); + ReadWriteIOUtils.write(doubleBuffer.getBuf().length, buffer); + buffer.write(doubleBuffer.getBuf(), 0, doubleBuffer.size()); + } + + private static void serializeBooleanList(List<Boolean> values, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + final int bitsPerByte = 8; + + int size = (values.size() + bitsPerByte - 1) / bitsPerByte; + byte[] byteArray = new byte[size]; + + for (int i = 0; i < values.size(); i++) { + if (values.get(i)) { + int arrayIndex = i / bitsPerByte; + int bitPosition = i % bitsPerByte; + byteArray[arrayIndex] |= (byte) (1 << bitPosition); + } + } + + ReadWriteIOUtils.write(size, buffer); + buffer.write(byteArray); + } + + private static void serializeStringList(List<String> values, PublicBAOS buffer) + throws IOException { + ReadWriteIOUtils.write(values.size(), buffer); + if (values.isEmpty()) { + return; + } + Encoder encoder = + TSEncodingBuilder.getEncodingBuilder(TSEncoding.DICTIONARY).getEncoder(TSDataType.TEXT); + PublicBAOS stringBuffer = new PublicBAOS(); + for (String value : values) { + encoder.encode(new Binary(value.getBytes()), stringBuffer); + } + encoder.flush(stringBuffer); + ReadWriteIOUtils.write(stringBuffer.getBuf().length, buffer); + buffer.write(stringBuffer.getBuf(), 0, stringBuffer.size()); + } + public static InsertRecordsRequest deserializeInsertRecordsReq( ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer auxiliaryBuffer) { return null; } public static void main(String[] args) throws Exception { + testValue(); + } + + private static void testValue() throws IOException, ClassNotFoundException { + List<Long> timestamps = new ArrayList<>(); + List<List<TSDataType>> measurements = new ArrayList<>(); + List<List<Object>> values = new ArrayList<>(); + List<TSDataType> types = new ArrayList<>(); + List<Object> value = new ArrayList<>(); + int singleRowSize = 0; + for (int j = 0; j < 100; ++j) { + switch (((byte) j % 6)) { + case 0: + types.add(TSDataType.BOOLEAN); + value.add(true); + singleRowSize += 2; + break; + case 1: + types.add(TSDataType.INT32); + value.add(1); + singleRowSize += 5; + break; + case 2: + types.add(TSDataType.INT64); + value.add(1L); + singleRowSize += 9; + break; + case 3: + types.add(TSDataType.FLOAT); + value.add(1.0f); + singleRowSize += 5; + break; + case 4: + types.add(TSDataType.DOUBLE); + value.add(1.0); + singleRowSize += 9; + break; + case 5: + types.add(TSDataType.TEXT); + value.add("123"); + singleRowSize += 7; + break; + } + } + singleRowSize += 8; + int totalSize = singleRowSize * 1000; + for (int i = 0; i < 1000; ++i) { + timestamps.add(System.currentTimeMillis()); + measurements.add(types); + values.add(value); + } + PublicBAOS baos = new PublicBAOS(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(values); + // oos.writeObject(measurements); + // oos.writeObject(timestamps); + oos.flush(); + oos.close(); + ByteBuffer buffer = serializeValue(timestamps, measurements, values); + System.out.println("Original Size is " + totalSize); + System.out.println("Serialized Size is " + buffer.limit()); + System.out.println("Java serialize size is " + baos.size()); + System.out.println(); + } + + private static void testSchema() throws IOException { List<String> deviceIds = new ArrayList<>(); List<List<String>> measurementIdsList = new ArrayList<>(); List<String> measurementIds = new ArrayList<>(); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/TestSchema.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/TestSchema.java new file mode 100644 index 00000000000..99675c2926e --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/TestSchema.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.util; + +import java.util.List; + +public class TestSchema { + public List<String> deviceIds; + public List<List<String>> measurementIds; +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/TestValue.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/TestValue.java new file mode 100644 index 00000000000..2ac8fdb389c --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/TestValue.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.util; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import java.io.Serializable; +import java.util.List; + +public class TestValue implements Serializable { + public List<Long> timestamps; + public List<List<TSDataType>> types; + public List<List<Object>> values; +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java new file mode 100644 index 00000000000..fe02c4b529b --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/ValueBufferBuilder.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.util; + +public class ValueBufferBuilder { + +}
