This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_0423 by this 
push:
     new b20bbfdc25 add comments and basic classes for FAST WRITE TEST
b20bbfdc25 is described below

commit b20bbfdc25b95b5034a0fd4a98b9e470a8472850
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sun Apr 23 16:28:43 2023 +0800

    add comments and basic classes for FAST WRITE TEST
---
 .../java/org/apache/iotdb/isession/ISession.java   |  7 ++++
 .../execution/executor/RegionWriteExecutor.java    |  5 +++
 .../planner/plan/node/write/FastInsertRowNode.java | 42 ++++++++++++++++++++++
 .../plan/node/write/FastInsertRowsNode.java        | 35 ++++++++++++++++++
 .../planner/plan/node/write/InsertRowNode.java     |  9 +++++
 .../java/org/apache/iotdb/session/Session.java     | 10 ++++++
 .../apache/iotdb/session/util/SessionUtils.java    |  1 +
 thrift/src/main/thrift/client.thrift               |  8 +++++
 8 files changed, 117 insertions(+)

diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java 
b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 4095afe368..afae8e1e49 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -236,6 +236,13 @@ public interface ISession extends AutoCloseable {
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException;
 
+  void fastInsertRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException;
+
   void insertRecords(
       List<String> deviceIds,
       List<Long> times,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c035edabe5..f87669774d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -245,6 +245,8 @@ public class RegionWriteExecutor {
           PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(
               System.nanoTime() - startTime);
         }
+
+        // TODO: (FASTWRITE) 我们认为这块不会走到
         boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
         String partialInsertMessage = null;
         if (hasFailedMeasurement) {
@@ -255,6 +257,9 @@ public class RegionWriteExecutor {
           LOGGER.warn(partialInsertMessage);
         }
 
+        // TODO: (FAStWRITE) (侯昊男) 根据数据类型把 values 反序列化出来
+
+        // TODO: (FAStWRITE) 然后再进入到这一步
         ConsensusWriteResponse writeResponse =
             fireTriggerAndInsert(context.getRegionId(), insertNode);
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
new file mode 100644
index 0000000000..c3d843e58c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class FastInsertRowNode extends InsertRowNode {
+  public FastInsertRowNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public FastInsertRowNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      long time,
+      Object[] values,
+      boolean isNeedInferType) {
+    super(id, devicePath, isAligned, measurements, dataTypes, time, values, 
isNeedInferType);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
new file mode 100644
index 0000000000..57232b01ee
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowsNode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+import java.util.List;
+
+public class FastInsertRowsNode extends InsertRowsNode {
+  public FastInsertRowsNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public FastInsertRowsNode(
+      PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> 
insertRowNodeList) {
+    super(id, insertRowNodeIndexList, insertRowNodeList);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index e599e26344..74a35f5576 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -75,6 +75,8 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue, ISchemaV
   private static final byte TYPE_NULL = -2;
 
   private long time;
+
+  // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段
   private Object[] values;
 
   private boolean isNeedInferType = false;
@@ -285,6 +287,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue, ISchemaV
     subSerialize(stream);
   }
 
+  // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段后,相应的序列化反序列化方法要改一下
   void subSerialize(ByteBuffer buffer) {
     ReadWriteIOUtils.write(time, buffer);
     ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
@@ -809,6 +812,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue, ISchemaV
 
   @Override
   public void validateDeviceSchema(boolean isAligned) {
+    // TODO: [FASTWRITE] (周钰坤) 填充是否要 align
     if (this.isAligned != isAligned) {
       throw new SemanticException(
           new AlignedTimeseriesException(
@@ -833,6 +837,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue, ISchemaV
 
   @Override
   public void validateMeasurementSchema(int index, IMeasurementSchemaInfo 
measurementSchemaInfo) {
+    // TODO: [FASTWRITE] (周钰坤) 在元数据校验时将 measurements、dataTypes 填充好
     if (measurementSchemas == null) {
       measurementSchemas = new MeasurementSchema[measurements.length];
     }
@@ -846,9 +851,13 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue, ISchemaV
     }
 
     try {
+      // TODO: [FASTWRITE] (周钰坤) 这块可能需要直接去掉,不去做元数据的校验
       selfCheckDataTypes(index);
     } catch (DataTypeMismatchException | PathNotExistException e) {
       throw new SemanticException(e);
     }
+
+    measurements[index] = measurementSchemas[index].getMeasurementId();
+    dataTypes[index] = measurementSchemas[index].getType();
   }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index ef54c71948..c73a9c7c55 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1746,6 +1746,15 @@ public class Session implements ISession {
     request.addToValuesList(values);
   }
 
+  // TODO: (FASTWRITE) (曹志杰) 实现该接口
+  @Override
+  public void fastInsertRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {}
+
   /**
    * Insert multiple rows, which can reduce the overhead of network. This 
method is just like jdbc
    * executeBatch, we pack some insert request in batch and send them to 
server. If you want improve
@@ -2395,6 +2404,7 @@ public class Session implements ISession {
       throws IoTDBConnectionException {
     request.addToPrefixPaths(deviceId);
     request.addToTimestamps(time);
+    // TODO: (FASTWRITE) 不需要再添加 measurement 的信息
     request.addToMeasurementsList(measurements);
     ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
     request.addToValuesList(buffer);
diff --git 
a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java 
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 32ed19add9..57ea8e8f0b 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -79,6 +79,7 @@ public class SessionUtils {
   public static ByteBuffer getValueBuffer(List<TSDataType> types, List<Object> 
values)
       throws IoTDBConnectionException {
     ByteBuffer buffer = 
ByteBuffer.allocate(SessionUtils.calculateLength(types, values));
+    // TODO: (FASTWRITE) putValues 时可以少写一个字节的 Type
     SessionUtils.putValues(types, values, buffer);
     return buffer;
   }
diff --git a/thrift/src/main/thrift/client.thrift 
b/thrift/src/main/thrift/client.thrift
index ebb1ef1880..25ab31f9a1 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -246,6 +246,14 @@ struct TSInsertTabletsReq {
   8: optional bool isAligned
 }
 
+// TODO: (FASTWRITE)定义一个新的接口
+struct TSFastInsertRecordsReq {
+  1: required i64 sessionId
+  2: required list<string> prefixPaths
+  5: required list<i64> timestamps
+  4: required list<binary> valuesList
+}
+
 struct TSInsertRecordsReq {
   1: required i64 sessionId
   2: required list<string> prefixPaths

Reply via email to