This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch new-rpc-format-impl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7b88f906233bc4e7f5c44ebfa38d5c9bb567a9ac
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Mar 19 13:32:37 2024 +0800

    pass test
---
 .../java/org/apache/iotdb/session/Session.java     |  3 +-
 .../apache/iotdb/session/SessionConnection.java    |  2 ++
 .../apache/iotdb/session/util/SessionRPCUtils.java | 29 ++++++++++---------
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 33 +++++++++++++++++++++-
 .../plan/parser/StatementGenerator.java            | 25 ++++++++++++++++
 .../plan/statement/crud/InsertRowStatement.java    |  5 ++++
 6 files changed, 81 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 979ef760e64..52b0143497f 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -2434,12 +2434,11 @@ public class Session implements ISession {
       }
       insertByGroup(recordsGroup, SessionConnection::insertRecords);
     } else {
-      logger.info("Insert using insertRecordsV2");
       Map<SessionConnection, InsertRecordsRequest> requestMap = new 
HashMap<>();
       for (int i = 0; i < deviceIds.size(); ++i) {
         final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
         InsertRecordsRequest request =
-            requestMap.getOrDefault(connection, new InsertRecordsRequest());
+            requestMap.computeIfAbsent(connection, x -> new 
InsertRecordsRequest());
         request.getDeviceIds().add(deviceIds.get(i));
         request.getTimestamps().add(times.get(i));
         request.getDataTypesList().add(typesList.get(i));
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 11aeb5db0c3..48d2eae38a5 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -892,7 +892,9 @@ public class SessionConnection {
         if (status.isSetNeedRetry() && status.isNeedRetry()) {
           continue;
         }
+        // succeed or don't need to retry
         RpcUtils.verifySuccess(status);
+        return;
       } catch (TException e) {
         // all network exception need retry until reaching maxRetryCount
         lastTException = e;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
index 9a45e946b94..4a290d27bda 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
@@ -39,7 +39,8 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
@@ -53,6 +54,8 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class SessionRPCUtils {
+  private static final Logger logger = 
LoggerFactory.getLogger(SessionRPCUtils.class);
+
   public static ByteBuffer[] serializeInsertRecordsReq(
       List<String> deviceIds,
       List<List<String>> measurementIdsList,
@@ -73,7 +76,7 @@ public class SessionRPCUtils {
     PublicBAOS schemaBufferOS = new PublicBAOS();
     serializeDictionary(dictionary, schemaBufferOS);
     dictionaryEncoding(dictionary, deviceIds, measurementIdsList, 
schemaBufferOS);
-    ByteBuffer schemaBuffer = null;
+    ByteBuffer schemaBuffer;
     if (SessionConfig.enableRPCCompression) {
       ICompressor compressor = 
ICompressor.getCompressor(SessionConfig.rpcCompressionType);
       byte[] compressedData =
@@ -101,9 +104,9 @@ public class SessionRPCUtils {
     if (compressed) {
       int uncompressedLength = ReadWriteIOUtils.readInt(schemaBuffer);
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType);
-      byte[] uncompressed = new byte[uncompressedLength];
-      unCompressor.uncompress(schemaBuffer.array(), 5, schemaBuffer.limit() - 
5, uncompressed, 0);
-      buffer = ByteBuffer.wrap(uncompressed);
+      buffer = ByteBuffer.allocate(uncompressedLength);
+      unCompressor.uncompress(schemaBuffer, buffer);
+      buffer.flip();
     }
 
     String[] dictionary = getDictionary(buffer);
@@ -411,10 +414,10 @@ public class SessionRPCUtils {
     ByteBuffer dataBuffer = buffer;
     if (compressed) {
       int uncompressedSize = ReadWriteIOUtils.readInt(buffer);
-      byte[] uncompressed = new byte[uncompressedSize];
+      dataBuffer = ByteBuffer.allocate(uncompressedSize);
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(SessionConfig.rpcCompressionType);
-      unCompressor.uncompress(buffer.array(), 5, buffer.limit() - 5, 
uncompressed, 0);
-      dataBuffer = ByteBuffer.wrap(uncompressed);
+      unCompressor.uncompress(buffer, dataBuffer);
+      dataBuffer.flip();
     }
     deserializeTime(dataBuffer, outputTimestamps);
     deserializeTypesAndValues(dataBuffer, outputTypesList, outputValuesList);
@@ -440,7 +443,7 @@ public class SessionRPCUtils {
     float[] floatArray = deserializeFloatList(buffer);
     double[] doubleArray = deserializeDoubleList(buffer);
     boolean[] booleanArray = deserializeBooleanList(buffer);
-    String[] stringArray = deserializeStringList(buffer);
+    Binary[] stringArray = deserializeStringList(buffer);
     int[] indexes = new int[6];
     for (int i = 0; i < recordCount; ++i) {
       List<TSDataType> types = new ArrayList<>(recordSizeArray[i]);
@@ -548,15 +551,15 @@ public class SessionRPCUtils {
     return booleanArray;
   }
 
-  private static String[] deserializeStringList(ByteBuffer buffer) throws 
IOException {
+  private static Binary[] deserializeStringList(ByteBuffer buffer) throws 
IOException {
     int size = ReadWriteIOUtils.readInt(buffer);
     if (size == 0) {
-      return new String[0];
+      return new Binary[0];
     }
-    String[] stringArray = new String[size];
+    Binary[] stringArray = new Binary[size];
     Decoder decoder = Decoder.getDecoderByType(TSEncoding.DICTIONARY, 
TSDataType.TEXT);
     for (int i = 0; i < size; ++i) {
-      stringArray[i] = decoder.readBinary(buffer).toString();
+      stringArray[i] = decoder.readBinary(buffer);
     }
     return stringArray;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index ae86f7aed87..ca61a216057 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -1709,13 +1709,44 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   @Override
   public TSStatus insertRecordsV2(TSInsertRecordsReqV2 req) {
     try {
+      IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+      if (!SESSION_MANAGER.checkLogin(clientSession)) {
+        return getNotLoggedInStatus();
+      }
       InsertRecordsRequest request =
           SessionRPCUtils.deserializeInsertRecordsReq(req.schemaBuffer, 
req.valueBuffer, null);
+      req.auxiliaryBuffer = null;
+      req.valueBuffer = null;
+      req.schemaBuffer = null;
+
+      request.setMeasurementsIdsList(
+          
PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(request.getMeasurementsIdsList()));
+      InsertRowsStatement statement = 
StatementGenerator.createStatement(request);
+      if (statement.isEmpty()) {
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      }
+
+      TSStatus status = AuthorityChecker.checkAuthority(statement, 
clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
 
-      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      long queryId = SESSION_MANAGER.requestQueryId();
+      ExecutionResult result =
+          COORDINATOR.executeForTreeModel(
+              statement,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "",
+              partitionFetcher,
+              schemaFetcher);
+
+      return result.status;
     } catch (IOException e) {
       return onNpeOrUnexpectedException(
           e, OperationType.INSERT_RECORDS, TSStatusCode.INTERNAL_SERVER_ERROR);
+    } catch (IoTDBException e) {
+      return onIoTDBException(e, OperationType.INSERT_RECORDS, 
e.getErrorCode());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index 12b0734d24a..29a3326b606 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -88,6 +88,7 @@ import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.req.InsertRecordsRequest;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -407,6 +408,30 @@ public class StatementGenerator {
     return insertStatement;
   }
 
+  public static InsertRowsStatement createStatement(InsertRecordsRequest req)
+      throws IllegalPathException {
+    final long startTime = System.nanoTime();
+    InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+    List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+    for (int i = 0, size = req.getTimestamps().size(); i < size; ++i) {
+      InsertRowStatement statement = new InsertRowStatement();
+      
statement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(req.getDeviceIds().get(i)));
+      
statement.setMeasurements(req.getMeasurementsIdsList().get(i).toArray(new 
String[0]));
+      
TimestampPrecisionUtils.checkTimestampPrecision(req.getTimestamps().get(i));
+      statement.setTime(req.getTimestamps().get(i));
+      statement.setValuesAndTypes(
+          req.getValuesList().get(i).toArray(),
+          req.getDataTypesList().get(i).toArray(new TSDataType[0]));
+      if (statement.isEmpty()) {
+        continue;
+      }
+      insertRowStatementList.add(statement);
+    }
+    insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
+    PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - 
startTime);
+    return insertRowsStatement;
+  }
+
   public static InsertRowsStatement createStatement(TSInsertStringRecordsReq 
req)
       throws IllegalPathException {
     final long startTime = System.nanoTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 4e16ff3d3be..b4136588c03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -153,6 +153,11 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
     }
   }
 
+  public void setValuesAndTypes(Object[] values, TSDataType[] types) {
+    this.values = values;
+    this.dataTypes = types;
+  }
+
   public TTimePartitionSlot getTimePartitionSlot() {
     return TimePartitionUtils.getTimePartitionSlot(time);
   }

Reply via email to