This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cac7a525776 Fix follower data may out of order (#11625)
cac7a525776 is described below
commit cac7a525776311e5347c65f96edc0d9c5671d128
Author: Haonan <[email protected]>
AuthorDate: Mon Nov 27 20:32:10 2023 +0800
Fix follower data may out of order (#11625)
---
.../dataregion/DataRegionStateMachine.java | 13 +---
.../dataregion/DataRegionStateMachineTest.java | 78 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index be85b350d0b..3feed8a9bf8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -36,7 +36,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiT
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
@@ -192,25 +191,17 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
result =
new InsertMultiTabletsNode(insertNodes.get(0).getPlanNodeId(),
index, insertTabletNodes);
- } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
- boolean sameDevice = true;
+ } else { // merge to InsertRowsNode
PartialPath device = insertNodes.get(0).getDevicePath();
List<Integer> index = new ArrayList<>(size);
List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
int i = 0;
for (InsertNode insertNode : insertNodes) {
- if (sameDevice && !insertNode.getDevicePath().equals(device)) {
- sameDevice = false;
- }
insertRowNodes.add((InsertRowNode) insertNode);
index.add(i);
i++;
}
- result =
- sameDevice
- ? new InsertRowsOfOneDeviceNode(
- insertNodes.get(0).getPlanNodeId(), index, insertRowNodes)
- : new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index,
insertRowNodes);
+ result = new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index,
insertRowNodes);
}
result.setSearchIndex(insertNodes.get(0).getSearchIndex());
result.setDevicePath(insertNodes.get(0).getDevicePath());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java
new file mode 100644
index 00000000000..e7846de219d
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.statemachine.dataregion;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataRegionStateMachineTest {
+
+ @Test
+ public void testMergeInsertNodesToInsertNodesOfOneDevice() throws
IllegalPathException {
+ List<InsertNode> list = new ArrayList<>();
+ InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT,
TSDataType.INT64},
+ 1000L,
+ new Object[] {1.0, 2f, 300L},
+ false);
+ list.add(node);
+ node =
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT,
TSDataType.INT64},
+ 999L,
+ new Object[] {1.0, 2f, 300L},
+ false);
+ list.add(node);
+ node =
+ new InsertRowNode(
+ new PlanNodeId("plan node 1"),
+ new PartialPath("root.sg.d1"),
+ false,
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT,
TSDataType.INT64},
+ 998L,
+ new Object[] {1.0, 2f, 300L},
+ false);
+ list.add(node);
+ DataRegionStateMachine fakeStateMachine = new DataRegionStateMachine(null);
+ InsertNode mergedNode = fakeStateMachine.mergeInsertNodes(list);
+ Assert.assertTrue(mergedNode instanceof InsertRowsNode);
+ }
+}