This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_0423 by this
push:
new ab0f0a0b22 add interface of session (#9677)
ab0f0a0b22 is described below
commit ab0f0a0b22dde67d8a23adc751ce993d9a11f1f4
Author: Zhijia Cao <[email protected]>
AuthorDate: Sun Apr 23 19:51:24 2023 +0800
add interface of session (#9677)
---
.../java/org/apache/iotdb/FastInsertExample.java | 93 ++++++++++++++++++++++
.../db/mpp/plan/parser/StatementGenerator.java | 3 -
.../planner/plan/node/write/FastInsertRowNode.java | 12 +--
.../plan/node/write/FastInsertRowsNode.java | 4 +-
.../statement/crud/FastInsertRowStatement.java | 4 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 20 ++---
.../java/org/apache/iotdb/session/Session.java | 81 ++++++++++++++++++-
.../apache/iotdb/session/SessionConnection.java | 21 +++++
.../apache/iotdb/session/util/SessionUtils.java | 38 ++++++++-
9 files changed, 244 insertions(+), 32 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
new file mode 100644
index 0000000000..5e2a3a9efc
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/FastInsertExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings("squid:S106")
+public class FastInsertExample {
+
+ private static Session session;
+ private static Session sessionEnableRedirect;
+ private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
+ private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
+ private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
+ private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
+ private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
+ private static final String ROOT_SG1_D1 = "root.sg1.d1";
+ private static final String LOCAL_HOST = "127.0.0.1";
+
+ public static void main(String[] args)
+ throws IoTDBConnectionException, StatementExecutionException {
+ session =
+ new Session.Builder()
+ .host(LOCAL_HOST)
+ .port(6667)
+ .username("root")
+ .password("root")
+ .version(Version.V_1_0)
+ .build();
+ session.open(false);
+
+ fastInsertRecords();
+ session.close();
+ }
+
+ private static void fastInsertRecords()
+ throws IoTDBConnectionException, StatementExecutionException {
+ String deviceId = ROOT_SG1_D1;
+ List<String> deviceIds = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+
+ for (long time = 0; time < 500; time++) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ deviceIds.add(deviceId);
+ valuesList.add(values);
+ typesList.add(types);
+ timestamps.add(time);
+ if (time != 0 && time % 100 == 0) {
+ session.fastInsertRecords(deviceIds, timestamps, typesList,
valuesList);
+ deviceIds.clear();
+ valuesList.clear();
+ typesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.fastInsertRecords(deviceIds, timestamps, typesList, valuesList);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index a8982d373c..a5d9e25c96 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -394,9 +394,6 @@ public class StatementGenerator {
statement.setTime(req.getTimestamps().get(i));
statement.setValues(req.valuesList.get(i));
// skip empty statement
- if (statement.isEmpty()) {
- continue;
- }
insertRowStatementList.add(statement);
}
insertStatement.setInsertRowStatementList(insertRowStatementList);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index a04f833bac..0667f84028 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -19,26 +19,20 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
-import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.nio.ByteBuffer;
public class FastInsertRowNode extends InsertRowNode {
private ByteBuffer rawValues;
-
public FastInsertRowNode(PlanNodeId id) {
super(id);
}
- public FastInsertRowNode(
- PlanNodeId id,
- PartialPath devicePath,
- long time,
- ByteBuffer values) {
+ public FastInsertRowNode(PlanNodeId id, PartialPath devicePath, long time,
ByteBuffer values) {
super(id, devicePath, true, null, null, time, null, false);
this.rawValues = values;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
index 9cbc5ba349..e47fd9434b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -29,7 +29,9 @@ public class FastInsertRowsNode extends InsertRowsNode {
}
public FastInsertRowsNode(
- PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode>
fastInsertRowNodeList) {
+ PlanNodeId id,
+ List<Integer> insertRowNodeIndexList,
+ List<InsertRowNode> fastInsertRowNodeList) {
super(id, insertRowNodeIndexList, fastInsertRowNodeList);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
index b2a93fe502..0ece663002 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FastInsertRowStatement.java
@@ -17,12 +17,11 @@
* under the License.
*/
-
package org.apache.iotdb.db.mpp.plan.statement.crud;
import java.nio.ByteBuffer;
-public class FastInsertRowStatement extends InsertRowStatement{
+public class FastInsertRowStatement extends InsertRowStatement {
private ByteBuffer rawValues;
@@ -33,5 +32,4 @@ public class FastInsertRowStatement extends
InsertRowStatement{
public ByteBuffer getRawValues() {
return rawValues;
}
-
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index a18ccaf280..f515f729b7 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1216,8 +1216,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
// check whether measurement is legal according to syntax convention
-// req.setMeasurementsList(
-//
PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
+ // req.setMeasurementsList(
+ //
PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
// Step 1: transfer from TSInsertRecordsReq to Statement
InsertRowsStatement statement = StatementGenerator.createStatement(req);
@@ -1237,14 +1237,14 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// permission check
// CANNOT checkAuthority
-// TSStatus status = AuthorityChecker.checkAuthority(statement,
clientSession);
-// if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-// return status;
-// }
-//
-// quota =
-// DataNodeThrottleQuotaManager.getInstance()
-// .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ // TSStatus status = AuthorityChecker.checkAuthority(statement,
clientSession);
+ // if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // return status;
+ // }
+ //
+ // quota =
+ // DataNodeThrottleQuotaManager.getInstance()
+ //
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement);
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index c73a9c7c55..e39b23629f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -1746,14 +1747,36 @@ public class Session implements ISession {
request.addToValuesList(values);
}
- // TODO: (FASTWRITE) (曹志杰) 实现该接口
@Override
public void fastInsertRecords(
List<String> deviceIds,
List<Long> times,
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
- throws IoTDBConnectionException, StatementExecutionException {}
+ throws IoTDBConnectionException, StatementExecutionException {
+ int len = deviceIds.size();
+ if (len != times.size() || len != valuesList.size()) {
+ throw new IllegalArgumentException("deviceIds, times and valuesList's
size should be equal");
+ }
+ if (enableRedirection) {
+ fastInsertRecordsWithLeaderCache(deviceIds, times, typesList,
valuesList);
+ } else {
+ TSFastInsertRecordsReq request;
+ try {
+ request = genTSFastInsertRecordsReq(deviceIds, times, typesList,
valuesList);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceIds are
[{}],times are [{}]",
+ deviceIds.toString(),
+ times.toString());
+ return;
+ }
+ try {
+ defaultSessionConnection.fastInsertRecords(request);
+ } catch (RedirectException ignored) {
+ }
+ }
+ }
/**
* Insert multiple rows, which can reduce the overhead of network. This
method is just like jdbc
@@ -2335,6 +2358,31 @@ public class Session implements ISession {
insertByGroup(recordsGroup, SessionConnection::insertRecords);
}
+ private void fastInsertRecordsWithLeaderCache(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Map<SessionConnection, TSFastInsertRecordsReq> recordsGroup = new
HashMap<>();
+ for (int i = 0; i < deviceIds.size(); i++) {
+ final SessionConnection connection =
getSessionConnection(deviceIds.get(i));
+ TSFastInsertRecordsReq request =
+ recordsGroup.getOrDefault(connection, new TSFastInsertRecordsReq());
+ try {
+ updateTSFastInsertRecordsReq(
+ request, deviceIds.get(i), times.get(i), typesList.get(i),
valuesList.get(i));
+ recordsGroup.putIfAbsent(connection, request);
+ } catch (NoValidValueException e) {
+ logger.warn(
+ "All values are null and this submission is ignored,deviceId is
[{}],time is [{}]",
+ deviceIds.get(i),
+ times.get(i));
+ }
+ }
+ insertByGroup(recordsGroup, SessionConnection::fastInsertRecords);
+ }
+
private TSInsertRecordsReq filterAndGenTSInsertRecordsReq(
List<String> deviceIds,
List<Long> times,
@@ -2373,6 +2421,20 @@ public class Session implements ISession {
return request;
}
+ private TSFastInsertRecordsReq genTSFastInsertRecordsReq(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException {
+ TSFastInsertRecordsReq request = new TSFastInsertRecordsReq();
+ request.setPrefixPaths(deviceIds);
+ request.setTimestamps(times);
+ List<ByteBuffer> buffersList =
objectValuesListToByteBufferList(valuesList, typesList);
+ request.setValuesList(buffersList);
+ return request;
+ }
+
private void filterAndUpdateTSInsertRecordsReq(
TSInsertRecordsReq request,
String deviceId,
@@ -2404,12 +2466,25 @@ public class Session implements ISession {
throws IoTDBConnectionException {
request.addToPrefixPaths(deviceId);
request.addToTimestamps(time);
- // TODO: (FASTWRITE) 不需要再添加 measurement 的信息
request.addToMeasurementsList(measurements);
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
request.addToValuesList(buffer);
}
+ private void updateTSFastInsertRecordsReq(
+ TSFastInsertRecordsReq request,
+ String deviceId,
+ Long time,
+ List<TSDataType> types,
+ List<Object> values)
+ throws IoTDBConnectionException {
+ request.addToPrefixPaths(deviceId);
+ request.addToTimestamps(time);
+ ByteBuffer buffer =
+ ByteBuffer.allocate(SessionUtils.calculateLengthForFastInsert(types,
values));
+ request.addToValuesList(buffer);
+ }
+
/**
* insert the data of a device. For each timestamp, the number of
measurements is the same.
*
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index d5d90c4e23..58f2c40213 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFastInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -639,6 +640,26 @@ public class SessionConnection {
}
}
+ protected void fastInsertRecords(TSFastInsertRecordsReq request)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ request.setSessionId(sessionId);
+ try {
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(
+ client.fastInsertRecords(request), request.getPrefixPaths());
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ request.setSessionId(sessionId);
+ RpcUtils.verifySuccess(client.fastInsertRecords(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+ }
+
protected void insertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
request.setSessionId(sessionId);
diff --git
a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 57ea8e8f0b..5f55031b4f 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -79,8 +79,6 @@ public class SessionUtils {
public static ByteBuffer getValueBuffer(List<TSDataType> types, List<Object>
values)
throws IoTDBConnectionException {
ByteBuffer buffer =
ByteBuffer.allocate(SessionUtils.calculateLength(types, values));
- // TODO: (FASTWRITE) putValues 时可以少写一个字节的 Type
- SessionUtils.putValues(types, values, buffer);
return buffer;
}
@@ -121,6 +119,41 @@ public class SessionUtils {
return res;
}
+ public static int calculateLengthForFastInsert(List<TSDataType> types,
List<Object> values)
+ throws IoTDBConnectionException {
+ int res = 0;
+ for (int i = 0; i < types.size(); i++) {
+ // types
+ switch (types.get(i)) {
+ case BOOLEAN:
+ res += 1;
+ break;
+ case INT32:
+ res += Integer.BYTES;
+ break;
+ case INT64:
+ res += Long.BYTES;
+ break;
+ case FLOAT:
+ res += Float.BYTES;
+ break;
+ case DOUBLE:
+ res += Double.BYTES;
+ break;
+ case TEXT:
+ res += Integer.BYTES;
+ if (values.get(i) instanceof Binary) {
+ res += ((Binary) values.get(i)).getValues().length;
+ } else {
+ res += ((String)
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length;
+ }
+ break;
+ default:
+ throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE +
types.get(i));
+ }
+ }
+ return res;
+ }
/**
* put value in buffer
*
@@ -136,7 +169,6 @@ public class SessionUtils {
ReadWriteIOUtils.write(TYPE_NULL, buffer);
continue;
}
- ReadWriteIOUtils.write(types.get(i), buffer);
switch (types.get(i)) {
case BOOLEAN:
ReadWriteIOUtils.write((Boolean) values.get(i), buffer);