This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 10b5b87836e1d3a199abd7fcd11628018096c4fd
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Apr 24 17:17:51 2023 +0800

    tmp save
---
 .../java/org/apache/iotdb/WriteTestFixParallel.java   | 19 ++++++++++++-------
 .../planner/plan/node/write/FastInsertRowNode.java    | 10 ++++++++++
 2 files changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java 
b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
index ddbb4a7c16..0bbfd8bd84 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java
@@ -56,15 +56,17 @@ public class WriteTestFixParallel {
 
   private static Random r;
 
+  private static float[] floatData = new float[10000];
+
   /** Build a custom SessionPool for this example */
 
   /** Build a redirect-able SessionPool for this example */
   private static void constructRedirectSessionPool() {
     List<String> nodeUrls = new ArrayList<>();
     //    nodeUrls.add("127.0.0.1:6667");
-    nodeUrls.add("10.24.58.58:6667");
-    nodeUrls.add("10.24.58.67:6667");
-    nodeUrls.add("10.24.58.69:6667");
+    nodeUrls.add("192.168.130.16:6667");
+    nodeUrls.add("192.168.130.17:6667");
+    nodeUrls.add("192.168.130.18:6667");
     sessionPool =
         new SessionPool.Builder()
             .nodeUrls(nodeUrls)
@@ -155,6 +157,11 @@ public class WriteTestFixParallel {
       types.add(TSDataType.FLOAT);
     }
 
+    r = new Random();
+    for (int i = 0; i < floatData.length; i++) {
+      floatData[i] = r.nextFloat();
+    }
+
     Thread[] threads = new Thread[THREAD_NUMBER];
 
     SyncWriteSignal signal = new SyncWriteSignal(THREAD_NUMBER);
@@ -201,7 +208,6 @@ public class WriteTestFixParallel {
       throws StatementExecutionException, IoTDBConnectionException {
     List<String> deviceIds = new ArrayList<>();
     List<Long> times = new ArrayList<>();
-    List<List<String>> measurementsList = new ArrayList<>();
     List<List<TSDataType>> typesList = new ArrayList<>();
     List<List<Object>> valuesList = new ArrayList<>();
     int deviceCount = 0;
@@ -211,15 +217,14 @@ public class WriteTestFixParallel {
       times.add(timestamp);
       List<Object> values = new ArrayList<>();
       for (int i = 0; i < SENSOR_NUMBER; i++) {
-        values.add(r.nextFloat());
+        values.add(floatData[(int) ((i + j + timestamp) % floatData.length)]);
       }
       valuesList.add(values);
-      measurementsList.add(measurements);
       typesList.add(types);
       deviceCount++;
     }
 
-    sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, 
typesList, valuesList);
+    sessionPool.fastInsertRecords(deviceIds, times, typesList, valuesList);
     return deviceCount;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index 378d6a29c9..da5c4cb475 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -65,6 +65,7 @@ public class FastInsertRowNode extends InsertRowNode {
   }
 
   // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段后,相应的序列化反序列化方法要改一下
+  // 这个方法貌似没有被用到
   void subSerialize(ByteBuffer buffer) {
     ReadWriteIOUtils.write(getTime(), buffer);
     ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
@@ -75,6 +76,11 @@ public class FastInsertRowNode extends InsertRowNode {
     ReadWriteIOUtils.write(getTime(), stream);
     ReadWriteIOUtils.write(devicePath.getFullPath(), stream);
     serializeValues(stream);
+    // 如果有值,则将其序列化下去,为了给共识层用,共识层的 follower 收到的 insertNode 需要携带所有信息
+    ReadWriteIOUtils.write(measurementSchemas != null, stream);
+    if (measurementSchemas != null) {
+      serializeMeasurementsAndValues(stream);
+    }
   }
 
   /** Serialize measurements and values, ignoring failed time series */
@@ -103,6 +109,10 @@ public class FastInsertRowNode extends InsertRowNode {
       throw new IllegalArgumentException("Cannot deserialize InsertRowNode", 
e);
     }
     deserializeValues(byteBuffer);
+    boolean hasSchema = ReadWriteIOUtils.readBool(byteBuffer);
+    if (hasSchema) {
+      deserializeMeasurementsAndValues(byteBuffer);
+    }
   }
 
   void deserializeValues(ByteBuffer byteBuffer) {

Reply via email to