This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch new-rpc-format-impl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 62d9ab584dc3a0fd91050ca3fe1956c2ca8f063d
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Mar 12 13:24:48 2024 +0800

    assemble with session
---
 .../org/apache/iotdb/isession/SessionConfig.java   |   2 +
 .../java/org/apache/iotdb/session/Session.java     | 132 +++++++++++++++++----
 .../apache/iotdb/session/SessionConnection.java    |  47 ++++++++
 .../iotdb/session/req/InsertRecordsRequest.java    |   9 +-
 .../apache/iotdb/session/util/SessionRPCUtils.java |   6 +-
 5 files changed, 172 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
index 076120192fd..73e08fa22a5 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
@@ -61,5 +61,7 @@ public class SessionConfig {
 
   public static CompressionType rpcCompressionType = CompressionType.GZIP;
 
+  public static boolean enableInsertRecordsV2 = true;
+
   private SessionConfig() {}
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 4d0ad33efd7..979ef760e64 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -56,8 +57,10 @@ import 
org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.req.InsertRecordsRequest;
 import org.apache.iotdb.session.template.MeasurementNode;
 import org.apache.iotdb.session.template.TemplateQueryType;
+import org.apache.iotdb.session.util.SessionRPCUtils;
 import org.apache.iotdb.session.util.SessionUtils;
 import org.apache.iotdb.session.util.ThreadUtils;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -2405,29 +2408,118 @@ public class Session implements ISession {
       List<List<Object>> valuesList,
       boolean isAligned)
       throws IoTDBConnectionException, StatementExecutionException {
-    Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
-    for (int i = 0; i < deviceIds.size(); i++) {
-      final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
-      TSInsertRecordsReq request = recordsGroup.getOrDefault(connection, new 
TSInsertRecordsReq());
-      request.setIsAligned(isAligned);
+    if (!SessionConfig.enableInsertRecordsV2) {
+      Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new 
HashMap<>();
+      for (int i = 0; i < deviceIds.size(); i++) {
+        final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
+        TSInsertRecordsReq request =
+            recordsGroup.getOrDefault(connection, new TSInsertRecordsReq());
+        request.setIsAligned(isAligned);
+        try {
+          filterAndUpdateTSInsertRecordsReq(
+              request,
+              deviceIds.get(i),
+              times.get(i),
+              measurementsList.get(i),
+              typesList.get(i),
+              valuesList.get(i));
+          recordsGroup.putIfAbsent(connection, request);
+        } catch (NoValidValueException e) {
+          logger.warn(
+              "All values are null and this submission is ignored,deviceId is 
[{}],time is [{}],measurements are [{}]",
+              deviceIds.get(i),
+              times.get(i),
+              measurementsList.get(i));
+        }
+      }
+      insertByGroup(recordsGroup, SessionConnection::insertRecords);
+    } else {
+      logger.info("Insert using insertRecordsV2");
+      Map<SessionConnection, InsertRecordsRequest> requestMap = new 
HashMap<>();
+      for (int i = 0; i < deviceIds.size(); ++i) {
+        final SessionConnection connection = 
getSessionConnection(deviceIds.get(i));
+        InsertRecordsRequest request =
+            requestMap.getOrDefault(connection, new InsertRecordsRequest());
+        request.getDeviceIds().add(deviceIds.get(i));
+        request.getTimestamps().add(times.get(i));
+        request.getDataTypesList().add(typesList.get(i));
+        request.getValuesList().add(valuesList.get(i));
+        request.getMeasurementsIdsList().add(measurementsList.get(i));
+      }
+      insertRecordsV2ByGroupInParallel(requestMap);
+    }
+  }
+
+  private void insertRecordsV2ByGroupInParallel(
+      Map<SessionConnection, InsertRecordsRequest> requestMap)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<CompletableFuture<Void>> completableFutures =
+        requestMap.entrySet().stream()
+            .map(
+                entry -> {
+                  try {
+                    SessionConnection connection = entry.getKey();
+                    InsertRecordsRequest recordsReq = entry.getValue();
+                    TSInsertRecordsReqV2 req = 
genInsertRecordsReqV2(recordsReq);
+                    return CompletableFuture.runAsync(
+                        () -> {
+                          try {
+                            connection.insertRecordsV2(req);
+                          } catch (RedirectException e) {
+                            
e.getDeviceEndPointMap().forEach(this::handleRedirection);
+                          } catch (StatementExecutionException e) {
+                            throw new CompletionException(e);
+                          } catch (IoTDBConnectionException e) {
+                            // remove the broken session
+                            removeBrokenSessionConnection(connection);
+                            try {
+                              defaultSessionConnection.insertRecordsV2(req);
+                            } catch (IoTDBConnectionException | 
StatementExecutionException ex) {
+                              throw new CompletionException(ex);
+                            } catch (RedirectException ignored) {
+                            }
+                          }
+                        },
+                        OPERATION_EXECUTOR);
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (CompletableFuture<Void> completableFuture : completableFutures) {
       try {
-        filterAndUpdateTSInsertRecordsReq(
-            request,
-            deviceIds.get(i),
-            times.get(i),
-            measurementsList.get(i),
-            typesList.get(i),
-            valuesList.get(i));
-        recordsGroup.putIfAbsent(connection, request);
-      } catch (NoValidValueException e) {
-        logger.warn(
-            "All values are null and this submission is ignored,deviceId is 
[{}],time is [{}],measurements are [{}]",
-            deviceIds.get(i),
-            times.get(i),
-            measurementsList.get(i));
+        completableFuture.join();
+      } catch (CompletionException completionException) {
+        Throwable cause = completionException.getCause();
+        logger.error("Meet error when async insert!", cause);
+        if (cause instanceof IoTDBConnectionException) {
+          throw (IoTDBConnectionException) cause;
+        } else {
+          errMsgBuilder.append(cause.getMessage());
+        }
       }
     }
-    insertByGroup(recordsGroup, SessionConnection::insertRecords);
+    if (errMsgBuilder.length() > 0) {
+      throw new StatementExecutionException(errMsgBuilder.toString());
+    }
+  }
+
+  private TSInsertRecordsReqV2 genInsertRecordsReqV2(InsertRecordsRequest 
request)
+      throws IOException {
+    TSInsertRecordsReqV2 req = new TSInsertRecordsReqV2();
+    ByteBuffer[] buffers =
+        SessionRPCUtils.serializeInsertRecordsReq(
+            request.getDeviceIds(),
+            request.getMeasurementsIdsList(),
+            request.getTimestamps(),
+            request.getDataTypesList(),
+            request.getValuesList());
+    req.schemaBuffer = buffers[0];
+    req.valueBuffer = buffers[1];
+    req.auxiliaryBuffer = ByteBuffer.wrap(new byte[0]);
+    return req;
   }
 
   private TSInsertRecordsReq filterAndGenTSInsertRecordsReq(
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index fb701a65277..f9fa976d1e7 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -865,11 +866,57 @@ public class SessionConnection {
     }
   }
 
+  public void insertRecordsV2(TSInsertRecordsReqV2 request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
+        try {
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
+        }
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = insertRecordsV2Internal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        RpcUtils.verifySuccess(status);
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
+      }
+    }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
   private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws 
TException {
     request.setSessionId(sessionId);
     return client.insertRecords(request);
   }
 
+  private TSStatus insertRecordsV2Internal(TSInsertRecordsReqV2 request) 
throws TException {
+    return client.insertRecordsV2(request);
+  }
+
   protected void insertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java
index 407859efa08..be721d36076 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.session.req;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class InsertRecordsRequest {
@@ -42,7 +43,13 @@ public class InsertRecordsRequest {
     this.valuesList = valuesList;
   }
 
-  public InsertRecordsRequest() {}
+  public InsertRecordsRequest() {
+    this.deviceIds = new ArrayList<>();
+    this.measurementsIdsList = new ArrayList<>();
+    this.timestamps = new ArrayList<>();
+    this.dataTypesList = new ArrayList<>();
+    this.valuesList = new ArrayList<>();
+  }
 
   public List<String> getDeviceIds() {
     return deviceIds;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
index cf9b344dd8b..c3eed8befa9 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java
@@ -48,12 +48,12 @@ public class SessionRPCUtils {
       List<String> deviceIds,
       List<List<String>> measurementIdsList,
       List<Long> timestamp,
-      List<List<TSDataType>> tpyesList,
+      List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IOException {
     ByteBuffer schemaBuffer = serializeSchema(deviceIds, measurementIdsList);
-
-    return null;
+    ByteBuffer dataBuffer = serializeValue(timestamp, typesList, valuesList);
+    return new ByteBuffer[] {schemaBuffer, dataBuffer};
   }
 
   private static ByteBuffer serializeSchema(

Reply via email to