This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_follower_out_of_order in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 12dce5af6cc9cd7fa66d98eccf0eb2e9cbea0ff8 Author: HTHou <[email protected]> AuthorDate: Mon Nov 27 19:01:59 2023 +0800 Fix follower data may out of order --- .../dataregion/DataRegionStateMachine.java | 13 +--- .../dataregion/DataRegionStateMachineTest.java | 78 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java index be85b350d0b..3feed8a9bf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java @@ -36,7 +36,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiT import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache; @@ -192,25 +191,17 @@ public class DataRegionStateMachine extends BaseStateMachine { } result = new InsertMultiTabletsNode(insertNodes.get(0).getPlanNodeId(), index, insertTabletNodes); - } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode - boolean sameDevice = true; + } else { // merge to InsertRowsNode PartialPath device = insertNodes.get(0).getDevicePath(); List<Integer> index = new ArrayList<>(size); List<InsertRowNode> insertRowNodes = new ArrayList<>(size); int i = 0; for (InsertNode insertNode : insertNodes) { - if (sameDevice && !insertNode.getDevicePath().equals(device)) { - sameDevice = false; - } insertRowNodes.add((InsertRowNode) insertNode); index.add(i); i++; } - result = - sameDevice - ? new InsertRowsOfOneDeviceNode( - insertNodes.get(0).getPlanNodeId(), index, insertRowNodes) - : new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, insertRowNodes); + result = new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, insertRowNodes); } result.setSearchIndex(insertNodes.get(0).getSearchIndex()); result.setDevicePath(insertNodes.get(0).getDevicePath()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java new file mode 100644 index 00000000000..e7846de219d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.consensus.statemachine.dataregion; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class DataRegionStateMachineTest { + + @Test + public void testMergeInsertNodesToInsertNodesOfOneDevice() throws IllegalPathException { + List<InsertNode> list = new ArrayList<>(); + InsertRowNode node = + new InsertRowNode( + new PlanNodeId("plan node 1"), + new PartialPath("root.sg.d1"), + false, + new String[] {"s1", "s2", "s3"}, + new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64}, + 1000L, + new Object[] {1.0, 2f, 300L}, + false); + list.add(node); + node = + new InsertRowNode( + new PlanNodeId("plan node 1"), + new PartialPath("root.sg.d1"), + false, + new String[] {"s1", "s2", "s3"}, + new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64}, + 999L, + new Object[] {1.0, 2f, 300L}, + false); + list.add(node); + node = + new InsertRowNode( + new PlanNodeId("plan node 1"), + new PartialPath("root.sg.d1"), + false, + new String[] {"s1", "s2", "s3"}, + new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64}, + 998L, + new Object[] {1.0, 2f, 300L}, + false); + list.add(node); + DataRegionStateMachine fakeStateMachine = new DataRegionStateMachine(null); + InsertNode mergedNode = fakeStateMachine.mergeInsertNodes(list); + Assert.assertTrue(mergedNode instanceof InsertRowsNode); + } +}
