This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cac7a525776 Fix follower data may out of order (#11625)
cac7a525776 is described below

commit cac7a525776311e5347c65f96edc0d9c5671d128
Author: Haonan <[email protected]>
AuthorDate: Mon Nov 27 20:32:10 2023 +0800

    Fix follower data may out of order (#11625)
---
 .../dataregion/DataRegionStateMachine.java         | 13 +---
 .../dataregion/DataRegionStateMachineTest.java     | 78 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index be85b350d0b..3feed8a9bf8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -36,7 +36,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiT
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
@@ -192,25 +191,17 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
       }
       result =
           new InsertMultiTabletsNode(insertNodes.get(0).getPlanNodeId(), 
index, insertTabletNodes);
-    } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
-      boolean sameDevice = true;
+    } else { // merge to InsertRowsNode
       PartialPath device = insertNodes.get(0).getDevicePath();
       List<Integer> index = new ArrayList<>(size);
       List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
       int i = 0;
       for (InsertNode insertNode : insertNodes) {
-        if (sameDevice && !insertNode.getDevicePath().equals(device)) {
-          sameDevice = false;
-        }
         insertRowNodes.add((InsertRowNode) insertNode);
         index.add(i);
         i++;
       }
-      result =
-          sameDevice
-              ? new InsertRowsOfOneDeviceNode(
-                  insertNodes.get(0).getPlanNodeId(), index, insertRowNodes)
-              : new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, 
insertRowNodes);
+      result = new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, 
insertRowNodes);
     }
     result.setSearchIndex(insertNodes.get(0).getSearchIndex());
     result.setDevicePath(insertNodes.get(0).getDevicePath());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java
new file mode 100644
index 00000000000..e7846de219d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachineTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.statemachine.dataregion;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataRegionStateMachineTest {
+
+  @Test
+  public void testMergeInsertNodesToInsertNodesOfOneDevice() throws 
IllegalPathException {
+    List<InsertNode> list = new ArrayList<>();
+    InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, 
TSDataType.INT64},
+            1000L,
+            new Object[] {1.0, 2f, 300L},
+            false);
+    list.add(node);
+    node =
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, 
TSDataType.INT64},
+            999L,
+            new Object[] {1.0, 2f, 300L},
+            false);
+    list.add(node);
+    node =
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, 
TSDataType.INT64},
+            998L,
+            new Object[] {1.0, 2f, 300L},
+            false);
+    list.add(node);
+    DataRegionStateMachine fakeStateMachine = new DataRegionStateMachine(null);
+    InsertNode mergedNode = fakeStateMachine.mergeInsertNodes(list);
+    Assert.assertTrue(mergedNode instanceof InsertRowsNode);
+  }
+}

Reply via email to