This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 88ba093562b PipeConsensus: keep consistent with IoTConsensus in WAL, 
lastCache and other scenarios (#12822)
88ba093562b is described below

commit 88ba093562b92c3a6f98b47b9b1cb7e53e91c646
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Jun 28 01:52:27 2024 -0500

    PipeConsensus: keep consistent with IoTConsensus in WAL, lastCache and 
other scenarios (#12822)
---
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |  5 +++-
 .../pipeconsensus/PipeConsensusReceiver.java       |  2 ++
 .../plan/planner/plan/node/PlanNode.java           | 10 -------
 .../plan/node/pipe/PipeEnrichedInsertNode.java     |  5 ----
 .../plan/planner/plan/node/write/InsertNode.java   | 34 ++++++++++++++++------
 .../db/storageengine/dataregion/DataRegion.java    | 16 +++++-----
 .../storageengine/dataregion/wal/WALManager.java   | 27 ++++++++++++++---
 .../dataregion/wal/recover/WALNodeRecoverTask.java | 31 +++++++++++++++-----
 8 files changed, 85 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index ca08c7bbb23..989a3328aa7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -216,7 +216,10 @@ public class IoTDBStartCheck {
     properties = systemPropertiesHandler.read();
 
     if (systemPropertiesHandler.isFirstStart()) {
-      if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+      if 
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+              || config
+                  .getDataRegionConsensusProtocolClass()
+                  .equals(ConsensusFactory.IOT_CONSENSUS_V2))
           && config.getWalMode().equals(WALMode.DISABLE)) {
         throw new ConfigurationException(
             "Configuring the WALMode as disable is not supported under 
IoTConsensus");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 9d45f96062c..87f3da19285 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -285,6 +285,7 @@ public class PipeConsensusReceiver {
         Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
             .orElseThrow(() -> new 
ConsensusGroupNotExistException(consensusGroupId));
     final InsertNode insertNode = req.getInsertNode();
+    insertNode.markAsGeneratedByRemoteConsensusLeader();
     insertNode.setProgressIndex(
         
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
     return new 
TPipeConsensusTransferResp(impl.writeOnFollowerReplica(insertNode));
@@ -297,6 +298,7 @@ public class PipeConsensusReceiver {
         Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
             .orElseThrow(() -> new 
ConsensusGroupNotExistException(consensusGroupId));
     final InsertNode insertNode = req.convertToInsertNode();
+    insertNode.markAsGeneratedByRemoteConsensusLeader();
     insertNode.setProgressIndex(
         
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
     return new 
TPipeConsensusTransferResp(impl.writeOnFollowerReplica(insertNode));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 29dd52df29a..2bbf723fd30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -49,7 +49,6 @@ public abstract class PlanNode implements IConsensusRequest {
   protected PlanNodeId id;
 
   protected boolean isGeneratedByPipe = false;
-  protected boolean isGeneratedByRemoteConsensusLeader = false;
 
   protected PlanNode(PlanNodeId id) {
     requireNonNull(id, "id is null");
@@ -68,19 +67,10 @@ public abstract class PlanNode implements IConsensusRequest 
{
     return isGeneratedByPipe;
   }
 
-  public boolean isGeneratedByRemoteConsensusLeader() {
-    return isGeneratedByRemoteConsensusLeader;
-  }
-
   public void markAsGeneratedByPipe() {
     isGeneratedByPipe = true;
   }
 
-  @Override
-  public void markAsGeneratedByRemoteConsensusLeader() {
-    isGeneratedByRemoteConsensusLeader = true;
-  }
-
   public abstract List<PlanNode> getChildren();
 
   public abstract void addChild(PlanNode child);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 855fe79de5d..6ea566053ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -250,11 +250,6 @@ public class PipeEnrichedInsertNode extends InsertNode {
     return insertNode.getMinTime();
   }
 
-  @Override
-  public boolean isSyncFromLeaderWhenUsingIoTConsensus() {
-    return insertNode.isSyncFromLeaderWhenUsingIoTConsensus();
-  }
-
   @Override
   public void markFailedMeasurement(int index) {
     insertNode.markFailedMeasurement(index);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 054770c30ed..a572f689e8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -24,7 +24,10 @@ import 
org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
@@ -48,6 +51,8 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
   /** this insert node doesn't need to participate in iot consensus */
   public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
 
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
   /**
    * if use id table, this filed is id form of device path <br>
    * if not, this filed is device path<br>
@@ -73,6 +78,8 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
+  protected boolean isGeneratedByRemoteConsensusLeader = false;
+
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
@@ -169,6 +176,24 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     this.searchIndex = searchIndex;
   }
 
+  public boolean isGeneratedByRemoteConsensusLeader() {
+    switch (config.getDataRegionConsensusProtocolClass()) {
+      case ConsensusFactory.IOT_CONSENSUS:
+      case ConsensusFactory.IOT_CONSENSUS_V2:
+      case ConsensusFactory.FAST_IOT_CONSENSUS:
+      case ConsensusFactory.RATIS_CONSENSUS:
+        return isGeneratedByRemoteConsensusLeader;
+      case ConsensusFactory.SIMPLE_CONSENSUS:
+        return false;
+    }
+    return false;
+  }
+
+  @Override
+  public void markAsGeneratedByRemoteConsensusLeader() {
+    isGeneratedByRemoteConsensusLeader = true;
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     throw new NotImplementedException("serializeAttributes of InsertNode is 
not implemented");
@@ -231,15 +256,6 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   public abstract long getMinTime();
 
-  /**
-   * Notice: Call this method ONLY when using IOT_CONSENSUS, other consensus 
protocol cannot
-   * distinguish whether the insertNode sync from leader by this method.
-   * isSyncFromLeaderWhenUsingIoTConsensus == true means this node is a 
follower
-   */
-  public boolean isSyncFromLeaderWhenUsingIoTConsensus() {
-    return searchIndex == ConsensusReqReader.DEFAULT_SEARCH_INDEX;
-  }
-
   // region partial insert
   @TestOnly
   public void markFailedMeasurement(int index) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 9f7252bd98c..733eca443d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1097,8 +1097,7 @@ public class DataRegion implements IDataRegionForQuery {
 
   private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
-        || 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
-            && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
+        || node.isGeneratedByRemoteConsensusLeader()) {
       // disable updating last cache on follower
       return;
     }
@@ -1142,8 +1141,7 @@ public class DataRegion implements IDataRegionForQuery {
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
 
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      if 
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
-          && insertRowNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+      if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
         return tsFileProcessor;
       }
       // disable updating last cache on follower
@@ -1242,8 +1240,7 @@ public class DataRegion implements IDataRegionForQuery {
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
 
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      if 
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
-          && insertRowsNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+      if (insertRowsNode.isGeneratedByRemoteConsensusLeader()) {
         return;
       }
       // disable updating last cache on follower
@@ -3343,8 +3340,7 @@ public class DataRegion implements IDataRegionForQuery {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
       if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-        if 
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
-            && 
insertRowsOfOneDeviceNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+        if (insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) {
           return;
         }
         // disable updating last cache on follower
@@ -3623,7 +3619,9 @@ public class DataRegion implements IDataRegionForQuery {
 
   private void acquireDirectBufferMemory() throws DataRegionException {
     long acquireDirectBufferMemCost = 0;
-    if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
+    if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+        || 
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS)
+        || 
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2))
 {
       acquireDirectBufferMemCost = config.getWalBufferSize();
     } else if (config
         .getDataRegionConsensusProtocolClass()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
index f3911c9b196..37f6330f2b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
@@ -69,7 +69,11 @@ public class WALManager implements IService {
   private final AtomicLong totalFileNum = new AtomicLong();
 
   private WALManager() {
-    if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
+    if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+        || 
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)
+        || config
+            .getDataRegionConsensusProtocolClass()
+            .equals(ConsensusFactory.FAST_IOT_CONSENSUS)) {
       walNodesManager = new FirstCreateStrategy();
     } else if (config.getMaxWalNodesNum() == 0) {
       walNodesManager = new ElasticStrategy();
@@ -80,6 +84,12 @@ public class WALManager implements IService {
 
   public static String getApplicantUniqueId(String storageGroupName, boolean 
sequence) {
     return 
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+            || config
+                .getDataRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.IOT_CONSENSUS_V2)
+            || config
+                .getDataRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.FAST_IOT_CONSENSUS)
         ? storageGroupName
         : storageGroupName
             + IoTDBConstant.FILE_NAME_SEPARATOR
@@ -95,11 +105,17 @@ public class WALManager implements IService {
     return walNodesManager.applyForWALNode(applicantUniqueId);
   }
 
-  /** WAL node will be registered only when using iot consensus protocol. */
+  /** WAL node will be registered only when using iot series consensus 
protocol. */
   public void registerWALNode(
       String applicantUniqueId, String logDirectory, long startFileVersion, 
long startSearchIndex) {
     if (config.getWalMode() == WALMode.DISABLE
-        || 
!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
+        || 
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+            && !config
+                .getDataRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.IOT_CONSENSUS_V2)
+            && !config
+                .getDataRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) {
       return;
     }
 
@@ -114,7 +130,10 @@ public class WALManager implements IService {
         || 
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
             && !config
                 .getDataRegionConsensusProtocolClass()
-                .equals(ConsensusFactory.IOT_CONSENSUS_V2))) {
+                .equals(ConsensusFactory.IOT_CONSENSUS_V2)
+            && !config
+                .getDataRegionConsensusProtocolClass()
+                .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 07a06448359..1419f9c6136 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -107,13 +107,7 @@ public class WALNodeRecoverTask implements Runnable {
     }
 
     try {
-      if 
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
-        // delete this wal node folder
-        FileUtils.deleteFileOrDirectory(logDirectory);
-        logger.info(
-            "Successfully recover WAL node in the directory {}, so delete 
these wal files.",
-            logDirectory);
-      } else {
+      if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
         // delete checkpoint info to avoid repeated recover
         File[] checkpointFiles = 
CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
         for (File checkpointFile : checkpointFiles) {
@@ -133,6 +127,29 @@ public class WALNodeRecoverTask implements Runnable {
         logger.info(
             "Successfully recover WAL node in the directory {}, add this node 
to WALManger.",
             logDirectory);
+      } else {
+        // delete this wal node folder
+        FileUtils.deleteFileOrDirectory(logDirectory);
+        logger.info(
+            "Successfully recover WAL node in the directory {}, so delete 
these wal files.",
+            logDirectory);
+      }
+
+      // PipeConsensus will not only delete WAL node folder, but also register 
WAL node.
+      if 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS)
+          || config
+              .getDataRegionConsensusProtocolClass()
+              .equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
+        // register wal node
+        WALManager.getInstance()
+            .registerWALNode(
+                logDirectory.getName(),
+                logDirectory.getAbsolutePath(),
+                lastVersionId + 1,
+                lastSearchIndex);
+        logger.info(
+            "Successfully recover WAL node in the directory {}, add this node 
to WALManger.",
+            logDirectory);
       }
     } finally {
       allNodesRecoveredLatch.countDown();

Reply via email to