This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_0423 by this
push:
new b20bbfdc25 add comments and basic classes for FAST WRITE TEST
b20bbfdc25 is described below
commit b20bbfdc25b95b5034a0fd4a98b9e470a8472850
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sun Apr 23 16:28:43 2023 +0800
add comments and basic classes for FAST WRITE TEST
---
.../java/org/apache/iotdb/isession/ISession.java | 7 ++++
.../execution/executor/RegionWriteExecutor.java | 5 +++
.../planner/plan/node/write/FastInsertRowNode.java | 42 ++++++++++++++++++++++
.../plan/node/write/FastInsertRowsNode.java | 35 ++++++++++++++++++
.../planner/plan/node/write/InsertRowNode.java | 9 +++++
.../java/org/apache/iotdb/session/Session.java | 10 ++++++
.../apache/iotdb/session/util/SessionUtils.java | 1 +
thrift/src/main/thrift/client.thrift | 8 +++++
8 files changed, 117 insertions(+)
diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 4095afe368..afae8e1e49 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -236,6 +236,13 @@ public interface ISession extends AutoCloseable {
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException;
+ void fastInsertRecords(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException;
+
void insertRecords(
List<String> deviceIds,
List<Long> times,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c035edabe5..f87669774d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -245,6 +245,8 @@ public class RegionWriteExecutor {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(
System.nanoTime() - startTime);
}
+
+ // TODO: (FASTWRITE) 我们认为这块不会走到
boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
String partialInsertMessage = null;
if (hasFailedMeasurement) {
@@ -255,6 +257,9 @@ public class RegionWriteExecutor {
LOGGER.warn(partialInsertMessage);
}
+ // TODO: (FAStWRITE) (侯昊男) 根据数据类型把 values 反序列化出来
+
+ // TODO: (FAStWRITE) 然后再进入到这一步
ConsensusWriteResponse writeResponse =
fireTriggerAndInsert(context.getRegionId(), insertNode);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
new file mode 100644
index 0000000000..c3d843e58c
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class FastInsertRowNode extends InsertRowNode {
+ public FastInsertRowNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public FastInsertRowNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ long time,
+ Object[] values,
+ boolean isNeedInferType) {
+ super(id, devicePath, isAligned, measurements, dataTypes, time, values,
isNeedInferType);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
new file mode 100644
index 0000000000..57232b01ee
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+import java.util.List;
+
+public class FastInsertRowsNode extends InsertRowsNode {
+ public FastInsertRowsNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public FastInsertRowsNode(
+ PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode>
insertRowNodeList) {
+ super(id, insertRowNodeIndexList, insertRowNodeList);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index e599e26344..74a35f5576 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -75,6 +75,8 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue, ISchemaV
private static final byte TYPE_NULL = -2;
private long time;
+
+ // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段
private Object[] values;
private boolean isNeedInferType = false;
@@ -285,6 +287,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue, ISchemaV
subSerialize(stream);
}
+ // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段后,相应的序列化反序列化方法要改一下
void subSerialize(ByteBuffer buffer) {
ReadWriteIOUtils.write(time, buffer);
ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
@@ -809,6 +812,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue, ISchemaV
@Override
public void validateDeviceSchema(boolean isAligned) {
+ // TODO: [FASTWRITE] (周钰坤) 填充是否要 align
if (this.isAligned != isAligned) {
throw new SemanticException(
new AlignedTimeseriesException(
@@ -833,6 +837,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue, ISchemaV
@Override
public void validateMeasurementSchema(int index, IMeasurementSchemaInfo
measurementSchemaInfo) {
+ // TODO: [FASTWRITE] (周钰坤) 在元数据校验时将 measurements、dataTypes 填充好
if (measurementSchemas == null) {
measurementSchemas = new MeasurementSchema[measurements.length];
}
@@ -846,9 +851,13 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue, ISchemaV
}
try {
+ // TODO: [FASTWRITE] (周钰坤) 这块可能需要直接去掉,不去做元数据的校验
selfCheckDataTypes(index);
} catch (DataTypeMismatchException | PathNotExistException e) {
throw new SemanticException(e);
}
+
+ measurements[index] = measurementSchemas[index].getMeasurementId();
+ dataTypes[index] = measurementSchemas[index].getType();
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index ef54c71948..c73a9c7c55 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1746,6 +1746,15 @@ public class Session implements ISession {
request.addToValuesList(values);
}
+ // TODO: (FASTWRITE) (曹志杰) 实现该接口
+ @Override
+ public void fastInsertRecords(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList)
+ throws IoTDBConnectionException, StatementExecutionException {}
+
/**
* Insert multiple rows, which can reduce the overhead of network. This
method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to
server. If you want improve
@@ -2395,6 +2404,7 @@ public class Session implements ISession {
throws IoTDBConnectionException {
request.addToPrefixPaths(deviceId);
request.addToTimestamps(time);
+ // TODO: (FASTWRITE) 不需要再添加 measurement 的信息
request.addToMeasurementsList(measurements);
ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
request.addToValuesList(buffer);
diff --git
a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 32ed19add9..57ea8e8f0b 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -79,6 +79,7 @@ public class SessionUtils {
public static ByteBuffer getValueBuffer(List<TSDataType> types, List<Object>
values)
throws IoTDBConnectionException {
ByteBuffer buffer =
ByteBuffer.allocate(SessionUtils.calculateLength(types, values));
+ // TODO: (FASTWRITE) putValues 时可以少写一个字节的 Type
SessionUtils.putValues(types, values, buffer);
return buffer;
}
diff --git a/thrift/src/main/thrift/client.thrift
b/thrift/src/main/thrift/client.thrift
index ebb1ef1880..25ab31f9a1 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -246,6 +246,14 @@ struct TSInsertTabletsReq {
8: optional bool isAligned
}
+// TODO: (FASTWRITE)定义一个新的接口
+struct TSFastInsertRecordsReq {
+ 1: required i64 sessionId
+ 2: required list<string> prefixPaths
+ 5: required list<i64> timestamps
+ 4: required list<binary> valuesList
+}
+
struct TSInsertRecordsReq {
1: required i64 sessionId
2: required list<string> prefixPaths