This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch test-new-record-rpc-format in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e10c8554aecc3554d87d657557bac851da1cb4b9 Author: Liu Xuxin <[email protected]> AuthorDate: Tue Jan 23 10:26:19 2024 +0800 support new rpc format insertion --- iotdb-client/session/pom.xml | 5 ++++ .../java/org/apache/iotdb/session/Session.java | 25 ++++++++++++++++- .../apache/iotdb/session/SessionConnection.java | 10 +++++++ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 31 ++++++++++++++++++++++ .../plan/parser/StatementGenerator.java | 23 ++++++++++++++++ .../thrift-datanode/src/main/thrift/client.thrift | 7 +++++ 6 files changed, 100 insertions(+), 1 deletion(-) diff --git a/iotdb-client/session/pom.xml b/iotdb-client/session/pom.xml index dcaa3515914..1c77ca417a7 100644 --- a/iotdb-client/session/pom.xml +++ b/iotdb-client/session/pom.xml @@ -72,6 +72,11 @@ <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> </dependency> + <dependency> + <groupId>org.mine.rpcutils</groupId> + <artifactId>RPC-Utils</artifactId> + <version>1.0-SNAPSHOT</version> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 4d0ad33efd7..66c137fc8f8 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -45,6 +45,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -72,6 +73,8 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.thrift.TException; +import org.mine.rpc.InsertRecordsReq; +import org.mine.rpc.InsertRecordsSerializeInColumnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +126,8 @@ public class Session implements ISession { protected boolean useSSL; protected String trustStore; protected String trustStorePwd; + public static boolean useNewFormat = true; + /** * Timeout of query can be set by users. A negative number means using the default configuration * of server. And value 0 will disable the function of query timeout. @@ -1904,7 +1909,7 @@ public class Session implements ISession { if (enableRedirection) { insertRecordsWithLeaderCache( deviceIds, times, measurementsList, typesList, valuesList, false); - } else { + } else if (!useNewFormat) { TSInsertRecordsReq request; try { request = @@ -1918,6 +1923,24 @@ public class Session implements ISession { defaultSessionConnection.insertRecords(request); } catch (RedirectException ignored) { } + } else { + // insert using new column rpc format + InsertRecordsReq req = + new InsertRecordsReq(deviceIds, measurementsList, typesList, valuesList, times); + ByteBuffer buffer = null; + try { + buffer = InsertRecordsSerializeInColumnUtils.encode(req); + } catch (IOException e) { + logger.error("Meets Exception when serializing buffer", e); + return; + } + TSInsertRecordsReqV2ColumnFormat request = new TSInsertRecordsReqV2ColumnFormat(); + request.setBuffer(buffer); + try { + defaultSessionConnection.insertRecords(request); + } catch (Exception e) { + logger.error("Meets exception when insert new records", e); + } } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 68e221807a6..57286acd01c 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -48,6 +48,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -877,6 +878,15 @@ public class SessionConnection { return client.insertRecords(request); } + protected void insertRecords(TSInsertRecordsReqV2ColumnFormat request) throws TException { + request.setSessionId(sessionId); + try { + client.insertRecordsV2ColumnFormat(request); + } catch (TException e) { + throw e; + } + } + protected void insertRecords(TSInsertStringRecordsReq request) throws IoTDBConnectionException, StatementExecutionException, RedirectException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 27bda2168f8..1129bc436fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -142,6 +142,7 @@ import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -181,6 +182,8 @@ import io.airlift.units.Duration; import io.jsonwebtoken.lang.Strings; import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; +import org.mine.rpc.InsertRecordsReq; +import org.mine.rpc.InsertRecordsSerializeInColumnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1643,6 +1646,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); + req = null; // return success when this statement is empty because server doesn't need to execute it if (statement.isEmpty()) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); @@ -1694,6 +1698,33 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } + @Override + public TSStatus insertRecordsV2ColumnFormat(TSInsertRecordsReqV2ColumnFormat req) { + byte[] buffer = req.getBuffer(); + if (buffer == null) { + return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, "Buffer is null"); + } + try { + InsertRecordsReq originalReq = + InsertRecordsSerializeInColumnUtils.decode(ByteBuffer.wrap(buffer)); + InsertRowsStatement statement = StatementGenerator.createStatement(originalReq); + // originalReq = null; + long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.execute( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), + "", + partitionFetcher, + schemaFetcher); + return result.status; + } catch (Exception e) { + LOGGER.error("Meet error while inserting will new request", e); + return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, e.getMessage()); + } + } + @Override public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) { long t1 = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java index 3a7ac2c0cbe..830228246d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java @@ -100,6 +100,7 @@ import org.antlr.v4.runtime.CharStreams; import org.antlr.v4.runtime.CommonTokenStream; import org.antlr.v4.runtime.atn.PredictionMode; import org.antlr.v4.runtime.tree.ParseTree; +import org.mine.rpc.InsertRecordsReq; import java.nio.ByteBuffer; import java.time.ZoneId; @@ -408,6 +409,28 @@ public class StatementGenerator { return insertStatement; } + public static InsertRowsStatement createStatement(InsertRecordsReq req) + throws IllegalPathException { + final long startTime = System.nanoTime(); + // construct insert statement + InsertRowsStatement insertStatement = new InsertRowsStatement(); + List<InsertRowStatement> insertRowStatementList = new ArrayList<>(); + List<String> devicePath = req.getPrefixPath(); + for (int i = 0, size = devicePath.size(); i < size; ++i) { + InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(devicePath.get(i))); + statement.setMeasurements(req.getMeasurements().get(i).toArray(new String[0])); + TimestampPrecisionUtils.checkTimestampPrecision(req.getTimestamp().get(i)); + statement.setTime(req.getTimestamp().get(i)); + statement.setValues(req.getValues().get(i).toArray(new Object[0])); + statement.setDataTypes(req.getTypes().get(i).toArray(new TSDataType[0])); + insertRowStatementList.add(statement); + } + insertStatement.setInsertRowStatementList(insertRowStatementList); + PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime); + return insertStatement; + } + public static InsertRowsStatement createStatement(TSInsertStringRecordsReq req) throws IllegalPathException { final long startTime = System.nanoTime(); diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 2b2f5585726..6dbb05a5654 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -255,6 +255,11 @@ struct TSInsertRecordsReq { 6: optional bool isAligned } +struct TSInsertRecordsReqV2ColumnFormat { + 1: required i64 sessionId + 2: required binary buffer +} + struct TSInsertRecordsOfOneDeviceReq { 1: required i64 sessionId 2: required string prefixPath @@ -583,6 +588,8 @@ service IClientRPCService { common.TSStatus insertRecords(1:TSInsertRecordsReq req); + common.TSStatus insertRecordsV2ColumnFormat(1:TSInsertRecordsReqV2ColumnFormat req); + common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq req); common.TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);
