This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch new-rpc-format-impl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 62d9ab584dc3a0fd91050ca3fe1956c2ca8f063d Author: Liu Xuxin <[email protected]> AuthorDate: Tue Mar 12 13:24:48 2024 +0800 assemble with session --- .../org/apache/iotdb/isession/SessionConfig.java | 2 + .../java/org/apache/iotdb/session/Session.java | 132 +++++++++++++++++---- .../apache/iotdb/session/SessionConnection.java | 47 ++++++++ .../iotdb/session/req/InsertRecordsRequest.java | 9 +- .../apache/iotdb/session/util/SessionRPCUtils.java | 6 +- 5 files changed, 172 insertions(+), 24 deletions(-) diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java index 076120192fd..73e08fa22a5 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java @@ -61,5 +61,7 @@ public class SessionConfig { public static CompressionType rpcCompressionType = CompressionType.GZIP; + public static boolean enableInsertRecordsV2 = true; + private SessionConfig() {} } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 4d0ad33efd7..979ef760e64 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -45,6 +45,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -56,8 +57,10 @@ import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp; import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; +import org.apache.iotdb.session.req.InsertRecordsRequest; import org.apache.iotdb.session.template.MeasurementNode; import org.apache.iotdb.session.template.TemplateQueryType; +import org.apache.iotdb.session.util.SessionRPCUtils; import org.apache.iotdb.session.util.SessionUtils; import org.apache.iotdb.session.util.ThreadUtils; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -2405,29 +2408,118 @@ public class Session implements ISession { List<List<Object>> valuesList, boolean isAligned) throws IoTDBConnectionException, StatementExecutionException { - Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>(); - for (int i = 0; i < deviceIds.size(); i++) { - final SessionConnection connection = getSessionConnection(deviceIds.get(i)); - TSInsertRecordsReq request = recordsGroup.getOrDefault(connection, new TSInsertRecordsReq()); - request.setIsAligned(isAligned); + if (!SessionConfig.enableInsertRecordsV2) { + Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>(); + for (int i = 0; i < deviceIds.size(); i++) { + final SessionConnection connection = getSessionConnection(deviceIds.get(i)); + TSInsertRecordsReq request = + recordsGroup.getOrDefault(connection, new TSInsertRecordsReq()); + request.setIsAligned(isAligned); + try { + filterAndUpdateTSInsertRecordsReq( + request, + deviceIds.get(i), + times.get(i), + measurementsList.get(i), + typesList.get(i), + valuesList.get(i)); + recordsGroup.putIfAbsent(connection, request); + } catch (NoValidValueException e) { + logger.warn( + "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements are [{}]", + deviceIds.get(i), + times.get(i), + measurementsList.get(i)); + } + } + insertByGroup(recordsGroup, SessionConnection::insertRecords); + } else { + logger.info("Insert using insertRecordsV2"); + Map<SessionConnection, InsertRecordsRequest> requestMap = new HashMap<>(); + for (int i = 0; i < deviceIds.size(); ++i) { + final SessionConnection connection = getSessionConnection(deviceIds.get(i)); + InsertRecordsRequest request = + requestMap.getOrDefault(connection, new InsertRecordsRequest()); + request.getDeviceIds().add(deviceIds.get(i)); + request.getTimestamps().add(times.get(i)); + request.getDataTypesList().add(typesList.get(i)); + request.getValuesList().add(valuesList.get(i)); + request.getMeasurementsIdsList().add(measurementsList.get(i)); + } + insertRecordsV2ByGroupInParallel(requestMap); + } + } + + private void insertRecordsV2ByGroupInParallel( + Map<SessionConnection, InsertRecordsRequest> requestMap) + throws IoTDBConnectionException, StatementExecutionException { + List<CompletableFuture<Void>> completableFutures = + requestMap.entrySet().stream() + .map( + entry -> { + try { + SessionConnection connection = entry.getKey(); + InsertRecordsRequest recordsReq = entry.getValue(); + TSInsertRecordsReqV2 req = genInsertRecordsReqV2(recordsReq); + return CompletableFuture.runAsync( + () -> { + try { + connection.insertRecordsV2(req); + } catch (RedirectException e) { + e.getDeviceEndPointMap().forEach(this::handleRedirection); + } catch (StatementExecutionException e) { + throw new CompletionException(e); + } catch (IoTDBConnectionException e) { + // remove the broken session + removeBrokenSessionConnection(connection); + try { + defaultSessionConnection.insertRecordsV2(req); + } catch (IoTDBConnectionException | StatementExecutionException ex) { + throw new CompletionException(ex); + } catch (RedirectException ignored) { + } + } + }, + OPERATION_EXECUTOR); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + StringBuilder errMsgBuilder = new StringBuilder(); + for (CompletableFuture<Void> completableFuture : completableFutures) { try { - filterAndUpdateTSInsertRecordsReq( - request, - deviceIds.get(i), - times.get(i), - measurementsList.get(i), - typesList.get(i), - valuesList.get(i)); - recordsGroup.putIfAbsent(connection, request); - } catch (NoValidValueException e) { - logger.warn( - "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements are [{}]", - deviceIds.get(i), - times.get(i), - measurementsList.get(i)); + completableFuture.join(); + } catch (CompletionException completionException) { + Throwable cause = completionException.getCause(); + logger.error("Meet error when async insert!", cause); + if (cause instanceof IoTDBConnectionException) { + throw (IoTDBConnectionException) cause; + } else { + errMsgBuilder.append(cause.getMessage()); + } } } - insertByGroup(recordsGroup, SessionConnection::insertRecords); + if (errMsgBuilder.length() > 0) { + throw new StatementExecutionException(errMsgBuilder.toString()); + } + } + + private TSInsertRecordsReqV2 genInsertRecordsReqV2(InsertRecordsRequest request) + throws IOException { + TSInsertRecordsReqV2 req = new TSInsertRecordsReqV2(); + ByteBuffer[] buffers = + SessionRPCUtils.serializeInsertRecordsReq( + request.getDeviceIds(), + request.getMeasurementsIdsList(), + request.getTimestamps(), + request.getDataTypesList(), + request.getValuesList()); + req.schemaBuffer = buffers[0]; + req.valueBuffer = buffers[1]; + req.auxiliaryBuffer = ByteBuffer.wrap(new byte[0]); + return req; } private TSInsertRecordsReq filterAndGenTSInsertRecordsReq( diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index fb701a65277..f9fa976d1e7 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -48,6 +48,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -865,11 +866,57 @@ public class SessionConnection { } } + public void insertRecordsV2(TSInsertRecordsReqV2 request) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + TException lastTException = null; + TSStatus status = null; + for (int i = 0; i <= maxRetryCount; i++) { + if (i > 0) { + // re-init the TException and TSStatus + lastTException = null; + status = null; + // not first time, we need to sleep and then reconnect + try { + TimeUnit.MILLISECONDS.sleep(retryIntervalInMs); + } catch (InterruptedException e) { + // just ignore + } + if (!reconnect()) { + // reconnect failed, just continue to make another retry. + continue; + } + } + try { + status = insertRecordsV2Internal(request); + // need retry + if (status.isSetNeedRetry() && status.isNeedRetry()) { + continue; + } + RpcUtils.verifySuccess(status); + } catch (TException e) { + // all network exception need retry until reaching maxRetryCount + lastTException = e; + } + } + + if (status != null) { + RpcUtils.verifySuccess(status); + } else if (lastTException != null) { + throw new IoTDBConnectionException(lastTException); + } else { + throw new IoTDBConnectionException(logForReconnectionFailure()); + } + } + private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws TException { request.setSessionId(sessionId); return client.insertRecords(request); } + private TSStatus insertRecordsV2Internal(TSInsertRecordsReqV2 request) throws TException { + return client.insertRecordsV2(request); + } + protected void insertRecords(TSInsertStringRecordsReq request) throws IoTDBConnectionException, StatementExecutionException, RedirectException { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java index 407859efa08..be721d36076 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/req/InsertRecordsRequest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.session.req; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import java.util.ArrayList; import java.util.List; public class InsertRecordsRequest { @@ -42,7 +43,13 @@ public class InsertRecordsRequest { this.valuesList = valuesList; } - public InsertRecordsRequest() {} + public InsertRecordsRequest() { + this.deviceIds = new ArrayList<>(); + this.measurementsIdsList = new ArrayList<>(); + this.timestamps = new ArrayList<>(); + this.dataTypesList = new ArrayList<>(); + this.valuesList = new ArrayList<>(); + } public List<String> getDeviceIds() { return deviceIds; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java index cf9b344dd8b..c3eed8befa9 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/SessionRPCUtils.java @@ -48,12 +48,12 @@ public class SessionRPCUtils { List<String> deviceIds, List<List<String>> measurementIdsList, List<Long> timestamp, - List<List<TSDataType>> tpyesList, + List<List<TSDataType>> typesList, List<List<Object>> valuesList) throws IOException { ByteBuffer schemaBuffer = serializeSchema(deviceIds, measurementIdsList); - - return null; + ByteBuffer dataBuffer = serializeValue(timestamp, typesList, valuesList); + return new ByteBuffer[] {schemaBuffer, dataBuffer}; } private static ByteBuffer serializeSchema(
