This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch new-rpc-format-impl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7b88f906233bc4e7f5c44ebfa38d5c9bb567a9ac Author: Liu Xuxin <[email protected]> AuthorDate: Tue Mar 19 13:32:37 2024 +0800 pass test --- .../java/org/apache/iotdb/session/Session.java | 3 +- .../apache/iotdb/session/SessionConnection.java | 2 ++ .../apache/iotdb/session/util/SessionRPCUtils.java | 29 ++++++++++--------- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 33 +++++++++++++++++++++- .../plan/parser/StatementGenerator.java | 25 ++++++++++++++++ .../plan/statement/crud/InsertRowStatement.java | 5 ++++ 6 files changed, 81 insertions(+), 16 deletions(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 979ef760e64..52b0143497f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -2434,12 +2434,11 @@ public class Session implements ISession { } insertByGroup(recordsGroup, SessionConnection::insertRecords); } else { - logger.info("Insert using insertRecordsV2"); Map<SessionConnection, InsertRecordsRequest> requestMap = new HashMap<>(); for (int i = 0; i < deviceIds.size(); ++i) { final SessionConnection connection = getSessionConnection(deviceIds.get(i)); InsertRecordsRequest request = - requestMap.getOrDefault(connection, new InsertRecordsRequest()); + requestMap.computeIfAbsent(connection, x -> new InsertRecordsRequest()); request.getDeviceIds().add(deviceIds.get(i)); request.getTimestamps().add(times.get(i)); request.getDataTypesList().add(typesList.get(i)); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 11aeb5db0c3..48d2eae38a5 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -892,7 +892,9 @@ public class SessionConnection { if (status.isSetNeedRetry() && status.isNeedRetry()) { continue; } + // succeed or don't need to retry RpcUtils.verifySuccess(status); + return; } catch (TException e) { // all network exception need retry until reaching maxRetryCount lastTException = e; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java index 9a45e946b94..4a290d27bda 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java @@ -39,7 +39,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.ObjectOutputStream; @@ -53,6 +54,8 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; public class SessionRPCUtils { + private static final Logger logger = LoggerFactory.getLogger(SessionRPCUtils.class); + public static ByteBuffer[] serializeInsertRecordsReq( List<String> deviceIds, List<List<String>> measurementIdsList, @@ -73,7 +76,7 @@ public class SessionRPCUtils { PublicBAOS schemaBufferOS = new PublicBAOS(); serializeDictionary(dictionary, schemaBufferOS); dictionaryEncoding(dictionary, deviceIds, measurementIdsList, schemaBufferOS); - ByteBuffer schemaBuffer = null; + ByteBuffer schemaBuffer; if (SessionConfig.enableRPCCompression) { ICompressor compressor = ICompressor.getCompressor(SessionConfig.rpcCompressionType); byte[] compressedData = @@ -101,9 +104,9 @@ public class SessionRPCUtils { if (compressed) { int uncompressedLength = ReadWriteIOUtils.readInt(schemaBuffer); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType); - byte[] uncompressed = new byte[uncompressedLength]; - unCompressor.uncompress(schemaBuffer.array(), 5, schemaBuffer.limit() - 5, uncompressed, 0); - buffer = ByteBuffer.wrap(uncompressed); + buffer = ByteBuffer.allocate(uncompressedLength); + unCompressor.uncompress(schemaBuffer, buffer); + buffer.flip(); } String[] dictionary = getDictionary(buffer); @@ -411,10 +414,10 @@ public class SessionRPCUtils { ByteBuffer dataBuffer = buffer; if (compressed) { int uncompressedSize = ReadWriteIOUtils.readInt(buffer); - byte[] uncompressed = new byte[uncompressedSize]; + dataBuffer = ByteBuffer.allocate(uncompressedSize); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType); - unCompressor.uncompress(buffer.array(), 5, buffer.limit() - 5, uncompressed, 0); - dataBuffer = ByteBuffer.wrap(uncompressed); + unCompressor.uncompress(buffer, dataBuffer); + dataBuffer.flip(); } deserializeTime(dataBuffer, outputTimestamps); deserializeTypesAndValues(dataBuffer, outputTypesList, outputValuesList); @@ -440,7 +443,7 @@ public class SessionRPCUtils { float[] floatArray = deserializeFloatList(buffer); double[] doubleArray = deserializeDoubleList(buffer); boolean[] booleanArray = deserializeBooleanList(buffer); - String[] stringArray = deserializeStringList(buffer); + Binary[] stringArray = deserializeStringList(buffer); int[] indexes = new int[6]; for (int i = 0; i < recordCount; ++i) { List<TSDataType> types = new ArrayList<>(recordSizeArray[i]); @@ -548,15 +551,15 @@ public class SessionRPCUtils { return booleanArray; } - private static String[] deserializeStringList(ByteBuffer buffer) throws IOException { + private static Binary[] deserializeStringList(ByteBuffer buffer) throws IOException { int size = ReadWriteIOUtils.readInt(buffer); if (size == 0) { - return new String[0]; + return new Binary[0]; } - String[] stringArray = new String[size]; + Binary[] stringArray = new Binary[size]; Decoder decoder = Decoder.getDecoderByType(TSEncoding.DICTIONARY, TSDataType.TEXT); for (int i = 0; i < size; ++i) { - stringArray[i] = decoder.readBinary(buffer).toString(); + stringArray[i] = decoder.readBinary(buffer); } return stringArray; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index ae86f7aed87..ca61a216057 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -1709,13 +1709,44 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus insertRecordsV2(TSInsertRecordsReqV2 req) { try { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return getNotLoggedInStatus(); + } InsertRecordsRequest request = SessionRPCUtils.deserializeInsertRecordsReq(req.schemaBuffer, req.valueBuffer, null); + req.auxiliaryBuffer = null; + req.valueBuffer = null; + req.schemaBuffer = null; + + request.setMeasurementsIdsList( + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(request.getMeasurementsIdsList())); + InsertRowsStatement statement = StatementGenerator.createStatement(request); + if (statement.isEmpty()) { + return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } + + TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } - return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); + + return result.status; } catch (IOException e) { return onNpeOrUnexpectedException( e, OperationType.INSERT_RECORDS, TSStatusCode.INTERNAL_SERVER_ERROR); + } catch (IoTDBException e) { + return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java index 12b0734d24a..29a3326b606 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java @@ -88,6 +88,7 @@ import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; +import org.apache.iotdb.session.req.InsertRecordsRequest; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -407,6 +408,30 @@ public class StatementGenerator { return insertStatement; } + public static InsertRowsStatement createStatement(InsertRecordsRequest req) + throws IllegalPathException { + final long startTime = System.nanoTime(); + InsertRowsStatement insertRowsStatement = new InsertRowsStatement(); + List<InsertRowStatement> insertRowStatementList = new ArrayList<>(); + for (int i = 0, size = req.getTimestamps().size(); i < size; ++i) { + InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(req.getDeviceIds().get(i))); + statement.setMeasurements(req.getMeasurementsIdsList().get(i).toArray(new String[0])); + TimestampPrecisionUtils.checkTimestampPrecision(req.getTimestamps().get(i)); + statement.setTime(req.getTimestamps().get(i)); + statement.setValuesAndTypes( + req.getValuesList().get(i).toArray(), + req.getDataTypesList().get(i).toArray(new TSDataType[0])); + if (statement.isEmpty()) { + continue; + } + insertRowStatementList.add(statement); + } + insertRowsStatement.setInsertRowStatementList(insertRowStatementList); + PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime); + return insertRowsStatement; + } + public static InsertRowsStatement createStatement(TSInsertStringRecordsReq req) throws IllegalPathException { final long startTime = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java index 4e16ff3d3be..b4136588c03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java @@ -153,6 +153,11 @@ public class InsertRowStatement extends InsertBaseStatement implements ISchemaVa } } + public void setValuesAndTypes(Object[] values, TSDataType[] types) { + this.values = values; + this.dataTypes = types; + } + public TTimePartitionSlot getTimePartitionSlot() { return TimePartitionUtils.getTimePartitionSlot(time); }
