This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch new-rpc-format-impl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d3713d9255002902fe8a2bea7d67be69b65d0531 Author: Liu Xuxin <[email protected]> AuthorDate: Thu Mar 14 11:27:06 2024 +0800 temp --- .../rpc/TCompressedElasticFramedTransport.java | 10 ++ .../apache/iotdb/session/SessionConnection.java | 1 + .../apache/iotdb/session/util/SessionRPCUtils.java | 131 +++++++++++++++------ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 12 +- .../thrift-datanode/src/main/thrift/client.thrift | 1 + 5 files changed, 118 insertions(+), 37 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index a3b4f38064a..8e6d8eb1c38 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -99,4 +99,14 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr protected abstract void uncompress(byte[] input, int inOff, int size, byte[] output, int outOff) throws IOException; + + public int getCompressedSize() throws IOException { + int length = writeBuffer.getPos(); + RpcStat.writeBytes.addAndGet(length); + int maxCompressedLength = maxCompressedLength(length); + writeCompressBuffer.resizeIfNecessary(maxCompressedLength); + int compressedLength = + compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0); + return compressedLength; + } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index f9fa976d1e7..11aeb5db0c3 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -914,6 +914,7 @@ public class SessionConnection { } private TSStatus insertRecordsV2Internal(TSInsertRecordsReqV2 request) throws TException { + request.setSessionId(sessionId); return client.insertRecordsV2(request); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java index c3eed8befa9..9a45e946b94 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java @@ -19,6 +19,9 @@ package org.apache.iotdb.session.util; import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.TSnappyElasticFramedTransport; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; import org.apache.iotdb.session.req.InsertRecordsRequest; import org.apache.iotdb.tsfile.compress.ICompressor; import org.apache.iotdb.tsfile.compress.IUnCompressor; @@ -33,7 +36,13 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + import java.io.IOException; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -553,23 +562,79 @@ public class SessionRPCUtils { } public static InsertRecordsRequest deserializeInsertRecordsReq( - ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer auxiliaryBuffer) { - return null; + ByteBuffer schemaBuffer, ByteBuffer valueBuffer, ByteBuffer auxiliaryBuffer) + throws IOException { + List<String> deviceIds = new ArrayList<>(); + List<Long> timestamps = new ArrayList<>(); + List<List<String>> measurementIds = new ArrayList<>(); + List<List<TSDataType>> dataTypes = new ArrayList<>(); + List<List<Object>> values = new ArrayList<>(); + deserializeSchema(schemaBuffer, deviceIds, measurementIds); + deserializeValueBuffer(valueBuffer, timestamps, dataTypes, values); + return new InsertRecordsRequest(deviceIds, measurementIds, timestamps, dataTypes, values); } public static void main(String[] args) throws Exception { testValue(); } - private static void testValue() throws IOException, ClassNotFoundException { + private static void testValue() + throws IOException, ClassNotFoundException, IoTDBConnectionException, TException { List<Long> timestamps = new ArrayList<>(); List<List<TSDataType>> measurements = new ArrayList<>(); List<List<Object>> values = new ArrayList<>(); List<TSDataType> types = new ArrayList<>(); List<Object> value = new ArrayList<>(); + int totalSize = 0; + for (int i = 0; i < 10000; ++i) { + timestamps.add(System.currentTimeMillis()); + totalSize += genOneRow(value, types); + totalSize += 8; + measurements.add(types); + values.add(value); + value = new ArrayList<>(); + types = new ArrayList<>(); + } + long startTime = System.currentTimeMillis(); + ByteBuffer buffer = serializeValue(timestamps, measurements, values); + PublicBAOS baos = new PublicBAOS(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(timestamps); + oos.writeObject(measurements); + oos.writeObject(values); + oos.flush(); + System.out.println("Serialize time cost is " + (System.currentTimeMillis() - startTime)); + System.out.println("Original Size is " + totalSize); + System.out.println("Serialized Size is " + buffer.limit()); + System.out.println("Compression rate is " + (double) totalSize / buffer.limit()); + System.out.println("Java serialize size is " + baos.size()); + TSInsertRecordsReq req = new TSInsertRecordsReq(); + req.setPrefixPaths(new ArrayList<>()); + req.setMeasurementsList(new ArrayList<>()); + req.setTimestamps(timestamps); + List<ByteBuffer> buffers = new ArrayList<>(); + for (int i = 0; i < values.size(); ++i) { + buffers.add(SessionUtils.getValueBuffer(measurements.get(i), values.get(i))); + } + req.setValuesList(buffers); + TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory(); + TSnappyElasticFramedTransport trans = new TSnappyElasticFramedTransport(null); + TProtocol protocol = protocolFactory.getProtocol(trans); + startTime = System.currentTimeMillis(); + req.write(protocol); + int snappyCompressedSize = trans.getCompressedSize(); + + System.out.println( + "Thrift serialize time and compression time is " + + (System.currentTimeMillis() - startTime)); + System.out.println("Thrift trans compressed size is " + snappyCompressedSize); + System.out.println("Thrift compression rate is " + ((double) totalSize / snappyCompressedSize)); + } + + private static int genOneRow(List<Object> value, List<TSDataType> types) { int singleRowSize = 0; for (int j = 0; j < 100; ++j) { - switch (((byte) j % 6)) { + switch (new Random().nextInt(6)) { case 0: types.add(TSDataType.BOOLEAN); value.add(new Random().nextBoolean()); @@ -577,55 +642,31 @@ public class SessionRPCUtils { break; case 1: types.add(TSDataType.INT32); - value.add(new Random().nextInt()); + value.add(100 + new Random().nextInt(10)); singleRowSize += 5; break; case 2: types.add(TSDataType.INT64); - value.add(new Random().nextLong()); + value.add(599L + new Random().nextInt(10)); singleRowSize += 9; break; case 3: types.add(TSDataType.FLOAT); - value.add(new Random().nextFloat()); - singleRowSize += 5; + value.add(10.0f + (new Random().nextInt(100) * 0.01f)); break; case 4: types.add(TSDataType.DOUBLE); - value.add(new Random().nextDouble()); - singleRowSize += 9; + value.add(50.0 + new Random().nextInt(200) * 0.01); break; case 5: types.add(TSDataType.TEXT); - value.add(String.valueOf(new Random().nextInt(100))); - singleRowSize += 7; + String s = String.valueOf(new Random().nextInt(10)); + value.add(s); + singleRowSize += (s.getBytes().length + 1); break; } } - singleRowSize += 8; - int totalSize = singleRowSize * 10000; - for (int i = 0; i < 10000; ++i) { - timestamps.add(System.currentTimeMillis()); - measurements.add(types); - values.add(value); - } - long startTime = System.currentTimeMillis(); - ByteBuffer buffer = serializeValue(timestamps, measurements, values); - System.out.println("Serialize time cost is " + (System.currentTimeMillis() - startTime)); - System.out.println("Original Size is " + totalSize); - System.out.println("Serialized Size is " + buffer.limit()); - System.out.println("Compression rate is " + (double) totalSize / buffer.limit()); - List<Long> decodedTime = new ArrayList<>(); - List<List<TSDataType>> decodedTypes = new ArrayList<>(); - List<List<Object>> decodedValues = new ArrayList<>(); - startTime = System.currentTimeMillis(); - deserializeValueBuffer(buffer, decodedTime, decodedTypes, decodedValues); - System.out.println("Time cost is " + (System.currentTimeMillis() - startTime)); - if (!decodedTime.equals(timestamps) - || !decodedTypes.equals(measurements) - || !decodedValues.equals(values)) { - System.out.println("ERROR"); - } + return singleRowSize; } private static void testSchema() throws IOException { @@ -661,4 +702,22 @@ public class SessionRPCUtils { System.out.println("Test failed"); } } + + private static void testThrift() throws Exception { + List<Long> timestamps = new ArrayList<>(); + List<List<TSDataType>> measurements = new ArrayList<>(); + List<List<Object>> values = new ArrayList<>(); + List<TSDataType> types = new ArrayList<>(); + List<Object> value = new ArrayList<>(); + int totalSize = 0; + for (int i = 0; i < 10000; ++i) { + timestamps.add(System.currentTimeMillis()); + totalSize += genOneRow(value, types); + totalSize += 8; + measurements.add(types); + values.add(value); + value = new ArrayList<>(); + types = new ArrayList<>(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 0983c700fec..ae86f7aed87 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -164,6 +164,8 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; +import org.apache.iotdb.session.req.InsertRecordsRequest; +import org.apache.iotdb.session.util.SessionRPCUtils; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -1706,7 +1708,15 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus insertRecordsV2(TSInsertRecordsReqV2 req) { - return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + try { + InsertRecordsRequest request = + SessionRPCUtils.deserializeInsertRecordsReq(req.schemaBuffer, req.valueBuffer, null); + + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } catch (IOException e) { + return onNpeOrUnexpectedException( + e, OperationType.INSERT_RECORDS, TSStatusCode.INTERNAL_SERVER_ERROR); + } } @Override diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 22ec15b5151..7ae4a811554 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -259,6 +259,7 @@ struct TSInsertRecordsReqV2 { 1: required binary schemaBuffer 2: required binary valueBuffer 3: required binary auxiliaryBuffer + 4: optional i64 sessionId } struct TSInsertRecordsOfOneDeviceReq {
