This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f7069baf2f Fix sync stuck problem of IoTConsensus and WAL (#12955)
2f7069baf2f is described below

commit 2f7069baf2ff5f58e89063480af57e7f072b43c2
Author: Li Yu Heng <[email protected]>
AuthorDate: Wed Jul 17 21:38:35 2024 +0800

    Fix sync stuck problem of IoTConsensus and WAL (#12955)
    
    * now i'm master of WALNode
    
    * improve
    
    * tan review
    
    * rename
---
 .../consensus/iot/log/ConsensusReqReader.java      |  2 +-
 .../dataregion/DataExecutionVisitor.java           |  6 +++
 .../dataregion/DataRegionStateMachine.java         |  5 ++-
 .../plan/planner/plan/node/PlanNodeType.java       |  2 +
 .../ContinuousSameSearchIndexSeparatorNode.java    | 43 ++++++++++++++++++++++
 .../db/storageengine/dataregion/DataRegion.java    | 14 +++++++
 .../dataregion/wal/buffer/WALEntry.java            |  3 ++
 .../dataregion/wal/buffer/WALEntryType.java        |  1 +
 .../dataregion/wal/buffer/WALInfoEntry.java        |  1 +
 .../dataregion/wal/node/IWALNode.java              |  4 ++
 .../dataregion/wal/node/WALFakeNode.java           |  7 ++++
 .../storageengine/dataregion/wal/node/WALNode.java | 15 ++++++--
 12 files changed, 98 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
index d7e9f246318..6959b56b674 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
@@ -46,7 +46,7 @@ public interface ConsensusReqReader {
 
   /** This iterator provides blocking and non-blocking interfaces to read 
consensus request. */
   interface ReqIterator {
-    // Like {@link Iterator#hasNext()}
+    /** Like {@link Iterator#hasNext()} */
     boolean hasNext();
 
     /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 14b2229ed78..efd9859588c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -60,6 +60,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
+      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (OutOfTTLException e) {
       LOGGER.warn("Error in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -77,6 +78,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion 
dataRegion) {
     try {
       dataRegion.insertTablet(node);
+      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (OutOfTTLException e) {
       LOGGER.warn("Error in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -113,6 +115,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
+      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -146,6 +149,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, 
DataRegion dataRegion) {
     try {
       dataRegion.insertTablets(node);
+      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (BatchProcessException e) {
       LOGGER.warn("Batch failure in executing a InsertMultiTabletsNode.");
@@ -177,6 +181,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
       InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
+      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -235,6 +240,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
               path, node.getDeleteStartTime(), node.getDeleteEndTime(), 
node.getSearchIndex());
         }
       }
+      dataRegion.insertSeparatorToWAL();
       PipeInsertionDataNodeListener.getInstance().listenToDeleteData(node);
       return StatusUtils.OK;
     } catch (IOException | IllegalPathException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index f60ae0bf9a4..2cc420e1f86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
 import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
@@ -148,9 +149,11 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
     for (IConsensusRequest req : indexedRequest.getRequests()) {
       // PlanNode in IndexedConsensusRequest should always be InsertNode
       PlanNode planNode = getPlanNode(req);
+      if (planNode instanceof SearchNode) {
+        ((SearchNode) 
planNode).setSearchIndex(indexedRequest.getSearchIndex());
+      }
       if (planNode instanceof InsertNode) {
         InsertNode innerNode = (InsertNode) planNode;
-        innerNode.setSearchIndex(indexedRequest.getSearchIndex());
         insertNodes.add(innerNode);
       } else if (indexedRequest.getRequests().size() == 1) {
         // If the planNode is not InsertNode, it is expected that the 
IndexedConsensusRequest only
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index c1432bf1451..061f22f171d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -222,6 +222,8 @@ public enum PlanNodeType {
   TIMESERIES_REGION_SCAN((short) 94),
   REGION_MERGE((short) 95),
   DEVICE_SCHEMA_FETCH_SCAN((short) 96),
+
+  CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR((short) 97),
   ;
 
   public static final int BYTES = Short.BYTES;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java
new file mode 100644
index 00000000000..78bf312bf9e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+
+/**
+ * For IoTConsensus sync. See <a 
href="https://github.com/apache/iotdb/pull/12955";>github pull
+ * request</a> for details.
+ */
+public class ContinuousSameSearchIndexSeparatorNode implements WALEntryValue {
+
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    
buffer.putShort(PlanNodeType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR.getNodeType());
+    // search index is always -1
+    buffer.putLong(-1);
+  }
+
+  @Override
+  public int serializedSize() {
+    return Short.BYTES + Long.BYTES;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index de15317527d..1396ab501f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -53,6 +53,7 @@ import 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -2163,6 +2164,19 @@ public class DataRegion implements IDataRegionForQuery {
     return walFlushListeners;
   }
 
+  /**
+   * For IoTConsensus sync. See <a 
href="https://github.com/apache/iotdb/pull/12955";>github pull
+   * request</a> for details.
+   */
+  public void insertSeparatorToWAL() {
+    getWALNode()
+        .ifPresent(
+            walNode ->
+                walNode.log(
+                    TsFileProcessor.MEMTABLE_NOT_EXIST,
+                    new ContinuousSameSearchIndexSeparatorNode()));
+  }
+
   private boolean canSkipDelete(
       TsFileResource tsFileResource,
       Set<PartialPath> devicePaths,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
index dff07dae0ef..f9ea8fdb333 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -71,6 +72,8 @@ public abstract class WALEntry implements SerializedSize {
       this.type = WALEntryType.DELETE_DATA_NODE;
     } else if (value instanceof Checkpoint) {
       this.type = WALEntryType.MEMORY_TABLE_CHECKPOINT;
+    } else if (value instanceof ContinuousSameSearchIndexSeparatorNode) {
+      this.type = WALEntryType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE;
     } else {
       throw new RuntimeException("Unknown WALEntry type");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
index 5a2785798f0..914a2e90663 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
@@ -40,6 +40,7 @@ public enum WALEntryType {
   MEMORY_TABLE_CHECKPOINT((byte) 7),
   /** {@link 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode} */
   INSERT_ROWS_NODE((byte) 8),
+  CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE((byte) 9),
   // endregion
   // region signal entry type
   // signal wal buffer has been closed
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
index 966361857f5..8b327b4880b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
@@ -76,6 +76,7 @@ public class WALInfoEntry extends WALEntry {
       case INSERT_ROWS_NODE:
       case DELETE_DATA_NODE:
       case MEMORY_TABLE_SNAPSHOT:
+      case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE:
         value.serializeToWAL(buffer);
         break;
       case MEMORY_TABLE_CHECKPOINT:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
index 7e10c3a70c6..2baf45f3ae5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
 
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -44,6 +45,9 @@ public interface IWALNode extends FlushListener, 
AutoCloseable, ConsensusReqRead
   /** Log DeleteDataNode. */
   WALFlushListener log(long memTableId, DeleteDataNode deleteDataNode);
 
+  /** Log BatchDoneNode */
+  WALFlushListener log(long memTableId, ContinuousSameSearchIndexSeparatorNode 
separatorNode);
+
   /** Callback when memTable created. */
   void onMemTableCreated(IMemTable memTable, String targetTsFile);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
index 2902a6dcae9..38b69f1162b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.node;
 
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -67,6 +68,12 @@ public class WALFakeNode implements IWALNode {
     return getResult();
   }
 
+  @Override
+  public WALFlushListener log(
+      long memTableId, ContinuousSameSearchIndexSeparatorNode separatorNode) {
+    return getResult();
+  }
+
   private WALFlushListener getResult() {
     switch (status) {
       case SUCCESS:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 79d5d26673b..755de60016b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -167,6 +168,13 @@ public class WALNode implements IWALNode {
     return log(walEntry);
   }
 
+  @Override
+  public WALFlushListener log(
+      long memTableId, ContinuousSameSearchIndexSeparatorNode separatorNode) {
+    WALEntry walEntry = new WALInfoEntry(memTableId, separatorNode);
+    return log(walEntry);
+  }
+
   private WALFlushListener log(WALEntry walEntry) {
 
     buffer.write(walEntry);
@@ -709,7 +717,8 @@ public class WALNode implements IWALNode {
             buffer.clear();
             if (currentIndex == targetIndex) {
               tmpNodes.add(new IoTConsensusRequest(buffer));
-            } else { // different search index, all slices found
+            } else {
+              // different search index, all slices found
               if (!tmpNodes.isEmpty()) {
                 insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
                 tmpNodes = new ArrayList<>();
@@ -720,8 +729,8 @@ public class WALNode implements IWALNode {
                 targetIndex = currentIndex;
               }
             }
-          } else if (!tmpNodes
-              .isEmpty()) { // next entry doesn't need to be searched, all 
slices found
+          } else if (!tmpNodes.isEmpty()) {
+            // next entry doesn't need to be searched, all slices found
             insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
             targetIndex++;
             tmpNodes = new ArrayList<>();

Reply via email to