This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch fast_write_test_with_guoneng in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 10b5b87836e1d3a199abd7fcd11628018096c4fd Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Apr 24 17:17:51 2023 +0800 tmp save --- .../java/org/apache/iotdb/WriteTestFixParallel.java | 19 ++++++++++++------- .../planner/plan/node/write/FastInsertRowNode.java | 10 ++++++++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java index ddbb4a7c16..0bbfd8bd84 100644 --- a/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java +++ b/example/session/src/main/java/org/apache/iotdb/WriteTestFixParallel.java @@ -56,15 +56,17 @@ public class WriteTestFixParallel { private static Random r; + private static float[] floatData = new float[10000]; + /** Build a custom SessionPool for this example */ /** Build a redirect-able SessionPool for this example */ private static void constructRedirectSessionPool() { List<String> nodeUrls = new ArrayList<>(); // nodeUrls.add("127.0.0.1:6667"); - nodeUrls.add("10.24.58.58:6667"); - nodeUrls.add("10.24.58.67:6667"); - nodeUrls.add("10.24.58.69:6667"); + nodeUrls.add("192.168.130.16:6667"); + nodeUrls.add("192.168.130.17:6667"); + nodeUrls.add("192.168.130.18:6667"); sessionPool = new SessionPool.Builder() .nodeUrls(nodeUrls) @@ -155,6 +157,11 @@ public class WriteTestFixParallel { types.add(TSDataType.FLOAT); } + r = new Random(); + for (int i = 0; i < floatData.length; i++) { + floatData[i] = r.nextFloat(); + } + Thread[] threads = new Thread[THREAD_NUMBER]; SyncWriteSignal signal = new SyncWriteSignal(THREAD_NUMBER); @@ -201,7 +208,6 @@ public class WriteTestFixParallel { throws StatementExecutionException, IoTDBConnectionException { List<String> deviceIds = new ArrayList<>(); List<Long> times = new ArrayList<>(); - List<List<String>> measurementsList = new ArrayList<>(); List<List<TSDataType>> typesList = new ArrayList<>(); List<List<Object>> valuesList = new ArrayList<>(); int deviceCount = 0; @@ -211,15 +217,14 @@ public class WriteTestFixParallel { times.add(timestamp); List<Object> values = new ArrayList<>(); for (int i = 0; i < SENSOR_NUMBER; i++) { - values.add(r.nextFloat()); + values.add(floatData[(int) ((i + j + timestamp) % floatData.length)]); } valuesList.add(values); - measurementsList.add(measurements); typesList.add(types); deviceCount++; } - sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList); + sessionPool.fastInsertRecords(deviceIds, times, typesList, valuesList); return deviceCount; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java index 378d6a29c9..da5c4cb475 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java @@ -65,6 +65,7 @@ public class FastInsertRowNode extends InsertRowNode { } // TODO: (FASTWRITE) (侯昊男) 增加 byteBuffer 字段后,相应的序列化反序列化方法要改一下 + // 这个方法貌似没有被用到 void subSerialize(ByteBuffer buffer) { ReadWriteIOUtils.write(getTime(), buffer); ReadWriteIOUtils.write(devicePath.getFullPath(), buffer); @@ -75,6 +76,11 @@ public class FastInsertRowNode extends InsertRowNode { ReadWriteIOUtils.write(getTime(), stream); ReadWriteIOUtils.write(devicePath.getFullPath(), stream); serializeValues(stream); + // 如果有值,则将其序列化下去,为了给共识层用,共识层的 follower 收到的 insertNode 需要携带所有信息 + ReadWriteIOUtils.write(measurementSchemas != null, stream); + if (measurementSchemas != null) { + serializeMeasurementsAndValues(stream); + } } /** Serialize measurements and values, ignoring failed time series */ @@ -103,6 +109,10 @@ public class FastInsertRowNode extends InsertRowNode { throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e); } deserializeValues(byteBuffer); + boolean hasSchema = ReadWriteIOUtils.readBool(byteBuffer); + if (hasSchema) { + deserializeMeasurementsAndValues(byteBuffer); + } } void deserializeValues(ByteBuffer byteBuffer) {
