This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 05060cb4cc4 [IOTDB-5935] Pipe: RecoverProgressIndex and 
HybridProgressIndex (#9975)
05060cb4cc4 is described below

commit 05060cb4cc42337b0fb8b397c547d52e97e48194
Author: yschengzi <[email protected]>
AuthorDate: Tue May 30 22:16:25 2023 +0800

    [IOTDB-5935] Pipe: RecoverProgressIndex and HybridProgressIndex (#9975)
    
    * Implement of RecoverProgressIndex, used for recovery of StorageEngine
    
    * Implement of HybridProgressIndex, used for hybrid progress index update 
and compare.
    
    * Update progress index for recovering TsFileResource.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../consensus/iot/IoTConsensusServerImpl.java      |   4 +-
 .../commons/consensus/index/ProgressIndex.java     |  45 +++++
 .../commons/consensus/index/ProgressIndexType.java |  18 +-
 .../consensus/index/impl/HybridProgressIndex.java  | 221 +++++++++++++++++++++
 .../consensus/index/impl/IoTProgressIndex.java     | 144 +++++++++-----
 .../consensus/index/impl/MinimumProgressIndex.java |   9 +-
 .../consensus/index/impl/RecoverProgressIndex.java | 207 +++++++++++++++++++
 .../consensus/index/impl/SimpleProgressIndex.java  | 145 +++++++++-----
 .../IoTConsensusDataRegionStateMachine.java        |   5 +-
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   4 +
 .../SimpleConsensusProgressIndexAssigner.java      |  17 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +-
 .../file/UnsealedTsFileRecoverPerformer.java       |   4 +
 .../TsFileResourceProgressIndexTest.java           |   6 +
 14 files changed, 705 insertions(+), 126 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index ba9d2709ad2..62c498f4b18 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -649,8 +649,8 @@ public class IoTConsensusServerImpl {
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
     if (request instanceof ComparableConsensusRequest) {
-      final IoTProgressIndex iotProgressIndex = new IoTProgressIndex();
-      iotProgressIndex.addSearchIndex(thisNode.getNodeId(), searchIndex.get() 
+ 1);
+      final IoTProgressIndex iotProgressIndex =
+          new IoTProgressIndex(thisNode.getNodeId(), searchIndex.get() + 1);
       ((ComparableConsensusRequest) 
request).setProgressIndex(iotProgressIndex);
     }
     return new IndexedConsensusRequest(searchIndex.get() + 1, 
Collections.singletonList(request));
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index b4a1f6aeca7..b6929a2fb91 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.commons.consensus.index;
 
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -76,4 +79,46 @@ public interface ProgressIndex {
    * @return the minimum progress index after the given progress index and 
this progress index
    */
   ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex);
+
+  /** @return the type of this progress index */
+  ProgressIndexType getType();
+
+  /**
+   * blend two progress index together, the result progress index should 
satisfy:
+   *
+   * <p>(result.equals(progressIndex1) || result.isAfter(progressIndex1)) is 
true
+   *
+   * <p>(result.equals(progressIndex2) || result.isAfter(progressIndex2)) is 
true
+   *
+   * <p>There is no R, such that R satisfies the above conditions and 
result.isAfter(R) is true
+   *
+   * @param progressIndex1 the first progress index. if it is null, the result 
progress index should
+   *     be the second progress index. if it is a minimum progress index, the 
result progress index
+   *     should be the second progress index. (if the second progress index is 
null, the result
+   *     should be a minimum progress index). if it is a hybrid progress 
index, the result progress
+   *     index should be the minimum progress index after the second progress 
index and the first
+   *     progress index
+   * @param progressIndex2 the second progress index. if it is null, the 
result progress index
+   *     should be the first progress index. if it is a minimum progress 
index, the result progress
+   *     index should be the first progress index. (if the first progress 
index is null, the result
+   *     should be a minimum progress index). if it is a hybrid progress 
index, the result progress
+   *     index should be the minimum progress index after the first progress 
index and the second
+   *     progress index
+   * @return the minimum progress index after the first progress index and the 
second progress index
+   */
+  static ProgressIndex blendProgressIndex(
+      ProgressIndex progressIndex1, ProgressIndex progressIndex2) {
+    if (progressIndex1 == null && progressIndex2 == null) {
+      return new MinimumProgressIndex();
+    }
+    if (progressIndex1 == null || progressIndex1 instanceof 
MinimumProgressIndex) {
+      return progressIndex2 == null ? new MinimumProgressIndex() : 
progressIndex2;
+    }
+    if (progressIndex2 == null || progressIndex2 instanceof 
MinimumProgressIndex) {
+      return progressIndex1; // progressIndex1 is not null
+    }
+
+    return new HybridProgressIndex(progressIndex1.getType().getType(), 
progressIndex1)
+        .updateToMinimumIsAfterProgressIndex(progressIndex2);
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
index 02afa4045df..615ce1336d9 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.commons.consensus.index;
 
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -30,9 +32,11 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public enum ProgressIndexType {
-  MINIMUM_CONSENSUS_INDEX((short) 1),
-  IOT_CONSENSUS_INDEX((short) 2),
-  SIMPLE_CONSENSUS_INDEX((short) 3),
+  MINIMUM_PROGRESS_INDEX((short) 1),
+  IOT_PROGRESS_INDEX((short) 2),
+  SIMPLE_PROGRESS_INDEX((short) 3),
+  RECOVER_PROGRESS_INDEX((short) 4),
+  HYBRID_PROGRESS_INDEX((short) 5),
   ;
 
   private final short type;
@@ -62,6 +66,10 @@ public enum ProgressIndexType {
         return IoTProgressIndex.deserializeFrom(byteBuffer);
       case 3:
         return SimpleProgressIndex.deserializeFrom(byteBuffer);
+      case 4:
+        return RecoverProgressIndex.deserializeFrom(byteBuffer);
+      case 5:
+        return HybridProgressIndex.deserializeFrom(byteBuffer);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
@@ -77,6 +85,10 @@ public enum ProgressIndexType {
         return IoTProgressIndex.deserializeFrom(stream);
       case 3:
         return SimpleProgressIndex.deserializeFrom(stream);
+      case 4:
+        return RecoverProgressIndex.deserializeFrom(stream);
+      case 5:
+        return HybridProgressIndex.deserializeFrom(stream);
       default:
         throw new UnsupportedOperationException(
             String.format("Unsupported progress index type %s.", indexType));
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
new file mode 100644
index 00000000000..dcd701cfbe6
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class HybridProgressIndex implements ProgressIndex {
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private final Map<Short, ProgressIndex> type2Index;
+
+  public HybridProgressIndex() {
+    this.type2Index = new HashMap<>();
+  }
+
+  public HybridProgressIndex(short type, ProgressIndex progressIndex) {
+    this.type2Index = new HashMap<>();
+    type2Index.put(type, progressIndex);
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(byteBuffer);
+
+      ReadWriteIOUtils.write(type2Index.size(), byteBuffer);
+      for (final Map.Entry<Short, ProgressIndex> entry : 
type2Index.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+        entry.getValue().serialize(byteBuffer);
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(stream);
+
+      ReadWriteIOUtils.write(type2Index.size(), stream);
+      for (final Map.Entry<Short, ProgressIndex> entry : 
type2Index.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), stream);
+        entry.getValue().serialize(stream);
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean isAfter(ProgressIndex progressIndex) {
+    lock.readLock().lock();
+    try {
+      if (progressIndex instanceof MinimumProgressIndex) {
+        return true;
+      }
+
+      if (!(progressIndex instanceof HybridProgressIndex)) {
+        final short type = progressIndex.getType().getType();
+        return type2Index.containsKey(type) && 
type2Index.get(type).isAfter(progressIndex);
+      }
+
+      final HybridProgressIndex thisHybridProgressIndex = this;
+      final HybridProgressIndex thatHybridProgressIndex = 
(HybridProgressIndex) progressIndex;
+      return thatHybridProgressIndex.type2Index.entrySet().stream()
+          .noneMatch(
+              entry ->
+                  
!thisHybridProgressIndex.type2Index.containsKey(entry.getKey())
+                      || !thisHybridProgressIndex
+                          .type2Index
+                          .get(entry.getKey())
+                          .isAfter(entry.getValue()));
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public boolean isGivenProgressIndexAfterSelf(ProgressIndex progressIndex) {
+    return type2Index.size() == 1
+        && type2Index.containsKey(progressIndex.getType().getType())
+        && 
progressIndex.isAfter(type2Index.get(progressIndex.getType().getType()));
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    lock.readLock().lock();
+    try {
+      if (!(progressIndex instanceof HybridProgressIndex)) {
+        return false;
+      }
+
+      final HybridProgressIndex thisHybridProgressIndex = this;
+      final HybridProgressIndex thatHybridProgressIndex = 
(HybridProgressIndex) progressIndex;
+      return thisHybridProgressIndex.type2Index.size() == 
thatHybridProgressIndex.type2Index.size()
+          && thatHybridProgressIndex.type2Index.entrySet().stream()
+              .allMatch(
+                  entry ->
+                      
thisHybridProgressIndex.type2Index.containsKey(entry.getKey())
+                          && thisHybridProgressIndex
+                              .type2Index
+                              .get(entry.getKey())
+                              .equals(entry.getValue()));
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof HybridProgressIndex)) {
+      return false;
+    }
+    return this.equals((HybridProgressIndex) obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
+    lock.writeLock().lock();
+    try {
+      if (progressIndex instanceof MinimumProgressIndex) {
+        return this;
+      }
+
+      if (!(progressIndex instanceof HybridProgressIndex)) {
+        type2Index.compute(
+            progressIndex.getType().getType(),
+            (thisK, thisV) ->
+                (thisV == null
+                    ? progressIndex
+                    : 
thisV.updateToMinimumIsAfterProgressIndex(progressIndex)));
+        return this;
+      }
+
+      final HybridProgressIndex thisHybridProgressIndex = this;
+      final HybridProgressIndex thatHybridProgressIndex = 
(HybridProgressIndex) progressIndex;
+      thatHybridProgressIndex.type2Index.forEach(
+          (thatK, thatV) ->
+              thisHybridProgressIndex.type2Index.compute(
+                  thatK,
+                  (thisK, thisV) ->
+                      (thisV == null ? thatV : 
thisV.updateToMinimumIsAfterProgressIndex(thatV))));
+      return this;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public ProgressIndexType getType() {
+    return ProgressIndexType.HYBRID_PROGRESS_INDEX;
+  }
+
+  public static HybridProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
+    for (int i = 0; i < size; i++) {
+      final short type = ReadWriteIOUtils.readShort(byteBuffer);
+      final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(byteBuffer);
+      hybridProgressIndex.type2Index.put(type, progressIndex);
+    }
+    return hybridProgressIndex;
+  }
+
+  public static HybridProgressIndex deserializeFrom(InputStream stream) throws 
IOException {
+    final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+    final int size = ReadWriteIOUtils.readInt(stream);
+    for (int i = 0; i < size; i++) {
+      final short type = ReadWriteIOUtils.readShort(stream);
+      final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(stream);
+      hybridProgressIndex.type2Index.put(type, progressIndex);
+    }
+    return hybridProgressIndex;
+  }
+
+  @Override
+  public String toString() {
+    return "HybridProgressIndex{" + "type2Index=" + type2Index + '}';
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index d84ef20394f..60c4092f760 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -29,79 +29,107 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class IoTProgressIndex implements ProgressIndex {
 
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
   private final Map<Integer, Long> peerId2SearchIndex;
 
   public IoTProgressIndex() {
     peerId2SearchIndex = new HashMap<>();
   }
 
-  public void addSearchIndex(Integer peerId, Long searchIndex) {
+  public IoTProgressIndex(Integer peerId, Long searchIndex) {
+    peerId2SearchIndex = new HashMap<>();
     peerId2SearchIndex.put(peerId, searchIndex);
   }
 
   @Override
   public void serialize(ByteBuffer byteBuffer) {
-    ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(byteBuffer);
-
-    ReadWriteIOUtils.write(peerId2SearchIndex.size(), byteBuffer);
-    for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet()) 
{
-      ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
-      ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.IOT_PROGRESS_INDEX.serialize(byteBuffer);
+
+      ReadWriteIOUtils.write(peerId2SearchIndex.size(), byteBuffer);
+      for (final Map.Entry<Integer, Long> entry : 
peerId2SearchIndex.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+        ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+      }
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
   @Override
   public void serialize(OutputStream stream) throws IOException {
-    ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(stream);
-
-    ReadWriteIOUtils.write(peerId2SearchIndex.size(), stream);
-    for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet()) 
{
-      ReadWriteIOUtils.write(entry.getKey(), stream);
-      ReadWriteIOUtils.write(entry.getValue(), stream);
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.IOT_PROGRESS_INDEX.serialize(stream);
+
+      ReadWriteIOUtils.write(peerId2SearchIndex.size(), stream);
+      for (final Map.Entry<Integer, Long> entry : 
peerId2SearchIndex.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), stream);
+        ReadWriteIOUtils.write(entry.getValue(), stream);
+      }
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
   @Override
   public boolean isAfter(ProgressIndex progressIndex) {
-    if (progressIndex instanceof MinimumProgressIndex) {
-      return true;
+    lock.readLock().lock();
+    try {
+      if (progressIndex instanceof MinimumProgressIndex) {
+        return true;
+      }
+
+      if (progressIndex instanceof HybridProgressIndex) {
+        return ((HybridProgressIndex) 
progressIndex).isGivenProgressIndexAfterSelf(this);
+      }
+
+      if (!(progressIndex instanceof IoTProgressIndex)) {
+        return false;
+      }
+
+      final IoTProgressIndex thisIoTProgressIndex = this;
+      final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
+      return thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+          .noneMatch(
+              entry ->
+                  
!thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+                      || 
thisIoTProgressIndex.peerId2SearchIndex.get(entry.getKey())
+                          <= entry.getValue());
+    } finally {
+      lock.readLock().unlock();
     }
-
-    if (!(progressIndex instanceof IoTProgressIndex)) {
-      return false;
-    }
-
-    final IoTProgressIndex thisIoTProgressIndex = this;
-    final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
-    return thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
-        .noneMatch(
-            entry ->
-                
!thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
-                    || 
thisIoTProgressIndex.peerId2SearchIndex.get(entry.getKey())
-                        <= entry.getValue());
   }
 
   @Override
   public boolean equals(ProgressIndex progressIndex) {
-    if (!(progressIndex instanceof IoTProgressIndex)) {
-      return false;
+    lock.readLock().lock();
+    try {
+      if (!(progressIndex instanceof IoTProgressIndex)) {
+        return false;
+      }
+
+      final IoTProgressIndex thisIoTProgressIndex = this;
+      final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
+      return thisIoTProgressIndex.peerId2SearchIndex.size()
+              == thatIoTProgressIndex.peerId2SearchIndex.size()
+          && thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+              .allMatch(
+                  entry ->
+                      
thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+                          && thisIoTProgressIndex
+                              .peerId2SearchIndex
+                              .get(entry.getKey())
+                              .equals(entry.getValue()));
+    } finally {
+      lock.readLock().unlock();
     }
-
-    final IoTProgressIndex thisIoTProgressIndex = this;
-    final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
-    return thisIoTProgressIndex.peerId2SearchIndex.size()
-            == thatIoTProgressIndex.peerId2SearchIndex.size()
-        && thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
-            .allMatch(
-                entry ->
-                    
thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
-                        && thisIoTProgressIndex
-                            .peerId2SearchIndex
-                            .get(entry.getKey())
-                            .equals(entry.getValue()));
   }
 
   @Override
@@ -125,17 +153,27 @@ public class IoTProgressIndex implements ProgressIndex {
 
   @Override
   public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
-    if (!(progressIndex instanceof IoTProgressIndex)) {
+    lock.writeLock().lock();
+    try {
+      if (!(progressIndex instanceof IoTProgressIndex)) {
+        return ProgressIndex.blendProgressIndex(this, progressIndex);
+      }
+
+      final IoTProgressIndex thisIoTProgressIndex = this;
+      final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
+      thatIoTProgressIndex.peerId2SearchIndex.forEach(
+          (thatK, thatV) ->
+              thisIoTProgressIndex.peerId2SearchIndex.compute(
+                  thatK, (thisK, thisV) -> (thisV == null ? thatV : 
Math.max(thisV, thatV))));
       return this;
+    } finally {
+      lock.writeLock().unlock();
     }
+  }
 
-    final IoTProgressIndex thisIoTProgressIndex = this;
-    final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
-    thatIoTProgressIndex.peerId2SearchIndex.forEach(
-        (thatK, thatV) ->
-            thisIoTProgressIndex.peerId2SearchIndex.compute(
-                thatK, (thisK, thisV) -> (thisV == null ? thatV : 
Math.max(thisV, thatV))));
-    return this;
+  @Override
+  public ProgressIndexType getType() {
+    return ProgressIndexType.IOT_PROGRESS_INDEX;
   }
 
   public static IoTProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
@@ -144,7 +182,7 @@ public class IoTProgressIndex implements ProgressIndex {
     for (int i = 0; i < size; i++) {
       final int peerId = ReadWriteIOUtils.readInt(byteBuffer);
       final long searchIndex = ReadWriteIOUtils.readLong(byteBuffer);
-      ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+      ioTProgressIndex.peerId2SearchIndex.put(peerId, searchIndex);
     }
     return ioTProgressIndex;
   }
@@ -155,7 +193,7 @@ public class IoTProgressIndex implements ProgressIndex {
     for (int i = 0; i < size; i++) {
       final int peerId = ReadWriteIOUtils.readInt(stream);
       final long searchIndex = ReadWriteIOUtils.readLong(stream);
-      ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+      ioTProgressIndex.peerId2SearchIndex.put(peerId, searchIndex);
     }
     return ioTProgressIndex;
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
index e36b990eae4..6b9c37ec5e1 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -33,12 +33,12 @@ public class MinimumProgressIndex implements ProgressIndex {
 
   @Override
   public void serialize(ByteBuffer byteBuffer) {
-    ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(byteBuffer);
+    ProgressIndexType.MINIMUM_PROGRESS_INDEX.serialize(byteBuffer);
   }
 
   @Override
   public void serialize(OutputStream stream) throws IOException {
-    ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(stream);
+    ProgressIndexType.MINIMUM_PROGRESS_INDEX.serialize(stream);
   }
 
   @Override
@@ -72,6 +72,11 @@ public class MinimumProgressIndex implements ProgressIndex {
     return progressIndex == null ? this : progressIndex;
   }
 
+  @Override
+  public ProgressIndexType getType() {
+    return ProgressIndexType.MINIMUM_PROGRESS_INDEX;
+  }
+
   public static MinimumProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
     return new MinimumProgressIndex();
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
new file mode 100644
index 00000000000..d2742acdb71
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RecoverProgressIndex implements ProgressIndex {
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private final Map<Integer, SimpleProgressIndex> dataNodeId2LocalIndex;
+
+  public RecoverProgressIndex() {
+    this.dataNodeId2LocalIndex = new HashMap<>();
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.RECOVER_PROGRESS_INDEX.serialize(byteBuffer);
+
+      ReadWriteIOUtils.write(dataNodeId2LocalIndex.size(), byteBuffer);
+      for (final Map.Entry<Integer, SimpleProgressIndex> entry : 
dataNodeId2LocalIndex.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+        entry.getValue().serialize(byteBuffer);
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.RECOVER_PROGRESS_INDEX.serialize(stream);
+
+      ReadWriteIOUtils.write(dataNodeId2LocalIndex.size(), stream);
+      for (final Map.Entry<Integer, SimpleProgressIndex> entry : 
dataNodeId2LocalIndex.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), stream);
+        entry.getValue().serialize(stream);
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean isAfter(ProgressIndex progressIndex) {
+    lock.readLock().lock();
+    try {
+      if (progressIndex instanceof MinimumProgressIndex) {
+        return true;
+      }
+
+      if (progressIndex instanceof HybridProgressIndex) {
+        return ((HybridProgressIndex) 
progressIndex).isGivenProgressIndexAfterSelf(this);
+      }
+
+      if (!(progressIndex instanceof RecoverProgressIndex)) {
+        return false;
+      }
+
+      final RecoverProgressIndex thisRecoverProgressIndex = this;
+      final RecoverProgressIndex thatRecoverProgressIndex = 
(RecoverProgressIndex) progressIndex;
+      return thatRecoverProgressIndex.dataNodeId2LocalIndex.entrySet().stream()
+          .noneMatch(
+              entry ->
+                  
!thisRecoverProgressIndex.dataNodeId2LocalIndex.containsKey(entry.getKey())
+                      || !thisRecoverProgressIndex
+                          .dataNodeId2LocalIndex
+                          .get(entry.getKey())
+                          .isAfter(entry.getValue()));
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    lock.readLock().lock();
+    try {
+      if (!(progressIndex instanceof RecoverProgressIndex)) {
+        return false;
+      }
+
+      final RecoverProgressIndex thisRecoverProgressIndex = this;
+      final RecoverProgressIndex thatRecoverProgressIndex = 
(RecoverProgressIndex) progressIndex;
+      return thisRecoverProgressIndex.dataNodeId2LocalIndex.size()
+              == thatRecoverProgressIndex.dataNodeId2LocalIndex.size()
+          && thatRecoverProgressIndex.dataNodeId2LocalIndex.entrySet().stream()
+              .allMatch(
+                  entry ->
+                      
thisRecoverProgressIndex.dataNodeId2LocalIndex.containsKey(entry.getKey())
+                          && thisRecoverProgressIndex
+                              .dataNodeId2LocalIndex
+                              .get(entry.getKey())
+                              .equals(entry.getValue()));
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof RecoverProgressIndex)) {
+      return false;
+    }
+    return this.equals((RecoverProgressIndex) obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
+    lock.writeLock().lock();
+    try {
+      if (!(progressIndex instanceof RecoverProgressIndex)) {
+        return ProgressIndex.blendProgressIndex(this, progressIndex);
+      }
+
+      final RecoverProgressIndex thisRecoverProgressIndex = this;
+      final RecoverProgressIndex thatRecoverProgressIndex = 
(RecoverProgressIndex) progressIndex;
+      thatRecoverProgressIndex.dataNodeId2LocalIndex.forEach(
+          (thatK, thatV) ->
+              thisRecoverProgressIndex.dataNodeId2LocalIndex.compute(
+                  thatK,
+                  (thisK, thisV) ->
+                      (thisV == null
+                          ? thatV
+                          : (SimpleProgressIndex)
+                              
thisV.updateToMinimumIsAfterProgressIndex(thatV))));
+      return this;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  public ProgressIndexType getType() {
+    return ProgressIndexType.RECOVER_PROGRESS_INDEX;
+  }
+
+  public static RecoverProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final RecoverProgressIndex recoverProgressIndex = new 
RecoverProgressIndex();
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
+    for (int i = 0; i < size; i++) {
+      final int dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+      final SimpleProgressIndex simpleProgressIndex =
+          SimpleProgressIndex.deserializeFrom(byteBuffer);
+      recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId, 
simpleProgressIndex);
+    }
+    return recoverProgressIndex;
+  }
+
+  public static RecoverProgressIndex deserializeFrom(InputStream stream) 
throws IOException {
+    final RecoverProgressIndex recoverProgressIndex = new 
RecoverProgressIndex();
+    final int size = ReadWriteIOUtils.readInt(stream);
+    for (int i = 0; i < size; i++) {
+      final int dataNodeId = ReadWriteIOUtils.readInt(stream);
+      final SimpleProgressIndex simpleProgressIndex = 
SimpleProgressIndex.deserializeFrom(stream);
+      recoverProgressIndex.dataNodeId2LocalIndex.put(dataNodeId, 
simpleProgressIndex);
+    }
+    return recoverProgressIndex;
+  }
+
+  @Override
+  public String toString() {
+    return "RecoverProgressIndex{" + "dataNodeId2LocalIndex=" + 
dataNodeId2LocalIndex + '}';
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index 6571d90759a..eaa80096823 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -27,9 +27,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SimpleProgressIndex implements ProgressIndex {
 
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
   private final int rebootTimes;
   private final long memtableFlushOrderId;
 
@@ -40,54 +43,78 @@ public class SimpleProgressIndex implements ProgressIndex {
 
   @Override
   public void serialize(ByteBuffer byteBuffer) {
-    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(byteBuffer);
-
-    ReadWriteIOUtils.write(rebootTimes, byteBuffer);
-    ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer);
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.SIMPLE_PROGRESS_INDEX.serialize(byteBuffer);
+
+      ReadWriteIOUtils.write(rebootTimes, byteBuffer);
+      ReadWriteIOUtils.write(memtableFlushOrderId, byteBuffer);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public void serialize(OutputStream stream) throws IOException {
-    ProgressIndexType.SIMPLE_CONSENSUS_INDEX.serialize(stream);
-
-    ReadWriteIOUtils.write(rebootTimes, stream);
-    ReadWriteIOUtils.write(memtableFlushOrderId, stream);
+    lock.readLock().lock();
+    try {
+      ProgressIndexType.SIMPLE_PROGRESS_INDEX.serialize(stream);
+
+      ReadWriteIOUtils.write(rebootTimes, stream);
+      ReadWriteIOUtils.write(memtableFlushOrderId, stream);
+    } finally {
+      lock.readLock().unlock();
+    }
   }
 
   @Override
   public boolean isAfter(ProgressIndex progressIndex) {
-    if (progressIndex instanceof MinimumProgressIndex) {
-      return true;
-    }
-
-    if (!(progressIndex instanceof SimpleProgressIndex)) {
-      return false;
-    }
-
-    final SimpleProgressIndex thisSimpleProgressIndex = this;
-    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) 
progressIndex;
-    if (thisSimpleProgressIndex.rebootTimes > 
thatSimpleProgressIndex.rebootTimes) {
-      return true;
+    lock.readLock().lock();
+    try {
+      if (progressIndex instanceof MinimumProgressIndex) {
+        return true;
+      }
+
+      if (progressIndex instanceof HybridProgressIndex) {
+        return ((HybridProgressIndex) 
progressIndex).isGivenProgressIndexAfterSelf(this);
+      }
+
+      if (!(progressIndex instanceof SimpleProgressIndex)) {
+        return false;
+      }
+
+      final SimpleProgressIndex thisSimpleProgressIndex = this;
+      final SimpleProgressIndex thatSimpleProgressIndex = 
(SimpleProgressIndex) progressIndex;
+      if (thisSimpleProgressIndex.rebootTimes > 
thatSimpleProgressIndex.rebootTimes) {
+        return true;
+      }
+      if (thisSimpleProgressIndex.rebootTimes < 
thatSimpleProgressIndex.rebootTimes) {
+        return false;
+      }
+      // thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
+      return thisSimpleProgressIndex.memtableFlushOrderId
+          > thatSimpleProgressIndex.memtableFlushOrderId;
+    } finally {
+      lock.readLock().unlock();
     }
-    if (thisSimpleProgressIndex.rebootTimes < 
thatSimpleProgressIndex.rebootTimes) {
-      return false;
-    }
-    // thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
-    return thisSimpleProgressIndex.memtableFlushOrderId
-        > thatSimpleProgressIndex.memtableFlushOrderId;
   }
 
   @Override
   public boolean equals(ProgressIndex progressIndex) {
-    if (!(progressIndex instanceof SimpleProgressIndex)) {
-      return false;
+    lock.readLock().lock();
+    try {
+      if (!(progressIndex instanceof SimpleProgressIndex)) {
+        return false;
+      }
+
+      final SimpleProgressIndex thisSimpleProgressIndex = this;
+      final SimpleProgressIndex thatSimpleProgressIndex = 
(SimpleProgressIndex) progressIndex;
+      return thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
+          && thisSimpleProgressIndex.memtableFlushOrderId
+              == thatSimpleProgressIndex.memtableFlushOrderId;
+    } finally {
+      lock.readLock().unlock();
     }
-
-    final SimpleProgressIndex thisSimpleProgressIndex = this;
-    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) 
progressIndex;
-    return thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
-        && thisSimpleProgressIndex.memtableFlushOrderId
-            == thatSimpleProgressIndex.memtableFlushOrderId;
   }
 
   @Override
@@ -111,29 +138,39 @@ public class SimpleProgressIndex implements ProgressIndex 
{
 
   @Override
   public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
-    if (!(progressIndex instanceof SimpleProgressIndex)) {
+    lock.writeLock().lock();
+    try {
+      if (!(progressIndex instanceof SimpleProgressIndex)) {
+        return ProgressIndex.blendProgressIndex(this, progressIndex);
+      }
+
+      final SimpleProgressIndex thisSimpleProgressIndex = this;
+      final SimpleProgressIndex thatSimpleProgressIndex = 
(SimpleProgressIndex) progressIndex;
+      if (thisSimpleProgressIndex.rebootTimes > 
thatSimpleProgressIndex.rebootTimes) {
+        return this;
+      }
+      if (thisSimpleProgressIndex.rebootTimes < 
thatSimpleProgressIndex.rebootTimes) {
+        return progressIndex;
+      }
+      // thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
+      if (thisSimpleProgressIndex.memtableFlushOrderId
+          > thatSimpleProgressIndex.memtableFlushOrderId) {
+        return this;
+      }
+      if (thisSimpleProgressIndex.memtableFlushOrderId
+          < thatSimpleProgressIndex.memtableFlushOrderId) {
+        return progressIndex;
+      }
+      // thisSimpleProgressIndex.memtableFlushOrderId ==
+      // thatSimpleProgressIndex.memtableFlushOrderId
       return this;
+    } finally {
+      lock.writeLock().lock();
     }
+  }
 
-    final SimpleProgressIndex thisSimpleProgressIndex = this;
-    final SimpleProgressIndex thatSimpleProgressIndex = (SimpleProgressIndex) 
progressIndex;
-    if (thisSimpleProgressIndex.rebootTimes > 
thatSimpleProgressIndex.rebootTimes) {
-      return this;
-    }
-    if (thisSimpleProgressIndex.rebootTimes < 
thatSimpleProgressIndex.rebootTimes) {
-      return progressIndex;
-    }
-    // thisSimpleProgressIndex.rebootTimes == 
thatSimpleProgressIndex.rebootTimes
-    if (thisSimpleProgressIndex.memtableFlushOrderId
-        > thatSimpleProgressIndex.memtableFlushOrderId) {
-      return this;
-    }
-    if (thisSimpleProgressIndex.memtableFlushOrderId
-        < thatSimpleProgressIndex.memtableFlushOrderId) {
-      return progressIndex;
-    }
-    // thisSimpleProgressIndex.memtableFlushOrderId == 
thatSimpleProgressIndex.memtableFlushOrderId
-    return this;
+  public ProgressIndexType getType() {
+    return ProgressIndexType.SIMPLE_PROGRESS_INDEX;
   }
 
   public static SimpleProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
index 04cb676d633..979a8bc550b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -88,9 +88,8 @@ public class IoTConsensusDataRegionStateMachine extends 
DataRegionStateMachine {
       for (IndexedConsensusRequest indexedRequest : 
batchRequest.getRequests()) {
         final PlanNode planNode = grabInsertNode(indexedRequest);
         if (planNode instanceof ComparableConsensusRequest) {
-          final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
-          ioTProgressIndex.addSearchIndex(
-              batchRequest.getSourcePeerId(), indexedRequest.getSearchIndex());
+          final IoTProgressIndex ioTProgressIndex =
+              new IoTProgressIndex(batchRequest.getSourcePeerId(), 
indexedRequest.getSearchIndex());
           ((ComparableConsensusRequest) 
planNode).setProgressIndex(ioTProgressIndex);
         }
         deserializedRequest.add(planNode);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2c1d5cc85b5..155653dc877 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -86,6 +86,10 @@ public class PipeRuntimeAgent implements IService {
     simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
   }
 
+  public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource 
tsFileResource) {
+    
simpleConsensusProgressIndexAssigner.assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+  }
+
   //////////////////////////// Runtime Exception Handlers 
////////////////////////////
 
   public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException 
pipeRuntimeException) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
index 648c4994f07..e3c54c6eb94 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -50,18 +50,14 @@ public class SimpleConsensusProgressIndexAssigner {
           + File.separator;
   private static final String REBOOT_TIMES_FILE_NAME = "reboot_times.txt";
 
-  private boolean isEnable = false;
+  private boolean isSimpleConsensusEnable = false;
 
   private int rebootTimes = 0;
   private final AtomicLong memtableFlushOrderId = new AtomicLong(0);
 
   public void start() throws StartupException {
-    // only works for simple consensus
-    if 
(!IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
-      return;
-    }
-
-    isEnable = true;
+    isSimpleConsensusEnable =
+        
IOTDB_CONFIG.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS);
     LOGGER.info("Start SimpleConsensusProgressIndexAssigner ...");
 
     try {
@@ -102,11 +98,16 @@ public class SimpleConsensusProgressIndexAssigner {
   }
 
   public void assignIfNeeded(TsFileResource tsFileResource) {
-    if (!isEnable) {
+    if (!isSimpleConsensusEnable) {
       return;
     }
 
     tsFileResource.updateProgressIndex(
         new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement()));
   }
+
+  public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource 
tsFileResource) {
+    tsFileResource.updateProgressIndex(
+        new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement()));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index af67df5e1dc..fd9311738b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -495,7 +495,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(new JMXService());
     JMXService.registerMBean(getInstance(), mbeanName);
 
-    // get resources for trigger,udf...
+    // get resources for trigger,udf,pipe...
     prepareResources();
 
     Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index d7629d79cb2..e3e675d2457 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 import org.apache.iotdb.db.wal.exception.WALRecoverException;
 import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
@@ -242,6 +243,9 @@ public class UnsealedTsFileRecoverPerformer extends 
AbstractTsFileRecoverPerform
           tsFileResource.updatePlanIndexes(recoveryMemTable.getMaxPlanIndex());
         }
 
+        // set recover progress index for pipe
+        
PipeAgent.runtime().assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+
         // if we put following codes in if clause above, this file can be 
continued writing into it
         // currently, we close this file anyway
         writer.endFile();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
index 8a9957cf47a..8300091dba1 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -169,5 +170,10 @@ public class TsFileResourceProgressIndexTest {
       }
       return this;
     }
+
+    @Override
+    public ProgressIndexType getType() {
+      throw new UnsupportedOperationException("method not implemented.");
+    }
   }
 }

Reply via email to