This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 84780e2d3f3 [IOTDB-5723] Pipe: progress index (#9446)(#9950)
84780e2d3f3 is described below

commit 84780e2d3f318c58ae24dd7358051f9296b1c5fe
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun May 28 07:56:57 2023 +0800

    [IOTDB-5723] Pipe: progress index (#9446)(#9950)
    
    Co-authored-by: yschengzi <[email protected]>
    Co-authored-by: Xiangpeng Hu <[email protected]>
---
 .../pipe/runtime/PipeRuntimeCoordinator.java       |   5 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   9 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |  10 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |   7 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |   9 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |   3 +-
 .../runtime/PipeHandleMetaChangeProcedureTest.java |   7 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |   7 +
 .../index/ComparableConsensusRequest.java          |  26 ++++
 .../commons/consensus/index/ProgressIndex.java     |  79 ++++++++++
 .../commons/consensus/index/ProgressIndexType.java |  79 ++++++++++
 .../consensus/index/impl/IoTProgressIndex.java     | 167 ++++++++++++++++++++
 .../consensus/index/impl/MinimumProgressIndex.java |  87 +++++++++++
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  67 ++++----
 .../commons/pipe/task/meta/PipeMetaDeSerTest.java  |   7 +-
 .../IoTConsensusDataRegionStateMachine.java        |  11 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   6 +
 .../db/engine/storagegroup/TsFileResource.java     |  58 +++++++
 .../plan/node/write/InsertMultiTabletsNode.java    |   7 +
 .../plan/planner/plan/node/write/InsertNode.java   |  20 ++-
 .../planner/plan/node/write/InsertRowsNode.java    |   7 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   7 +
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  41 +++--
 .../core/collector/IoTDBDataRegionCollector.java   |   9 +-
 .../PipeHistoricalDataRegionTsFileCollector.java   |  22 ++-
 .../realtime/PipeRealtimeDataRegionCollector.java  |  11 ++
 .../PipeRealtimeDataRegionHybridCollector.java     |   4 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   7 +-
 .../iotdb/db/pipe/core/event/EnrichedEvent.java    |  76 ++++++++-
 .../core/event/impl/PipeTabletInsertionEvent.java  |  69 +++++---
 .../core/event/impl/PipeTsFileInsertionEvent.java  |  71 +++++++--
 .../event/realtime/PipeRealtimeCollectEvent.java   |  49 ++++--
 .../realtime/PipeRealtimeCollectEventFactory.java  |   4 +-
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |  26 ++--
 .../org/apache/iotdb/db/pipe/task/PipeTask.java    |   7 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  15 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  11 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   3 +-
 .../TsFileResourceProgressIndexTest.java           | 173 +++++++++++++++++++++
 .../collector/CachedSchemaPatternMatcherTest.java  |  14 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  12 +-
 41 files changed, 1136 insertions(+), 173 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 3455a6c1369..2992f6dd31e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -71,7 +71,10 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
         .forEach(
             (regionId, pair) -> {
               if (regionId.getType().equals(TConsensusGroupType.DataRegion)) {
-                dataRegionGroupToOldAndNewLeaderPairMap.put(regionId, pair);
+                dataRegionGroupToOldAndNewLeaderPairMap.put(
+                    regionId,
+                    new Pair<>( // null or -1 means empty origin leader
+                        pair.left == null ? -1 : pair.left, pair.right == null 
? -1 : pair.right));
               }
             });
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 76ec9007ffa..2c9485c29fc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.persistence.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
@@ -210,7 +211,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             if (newDataRegionLeader != -1) {
                               consensusGroupIdToTaskMetaMap
                                   .get(dataRegionGroupId)
-                                  .setRegionLeader(newDataRegionLeader);
+                                  .setLeaderDataNodeId(newDataRegionLeader);
                             } else {
                               
consensusGroupIdToTaskMetaMap.remove(dataRegionGroupId);
                             }
@@ -219,9 +220,9 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             // region group is newly added.
                             if (newDataRegionLeader != -1) {
                               consensusGroupIdToTaskMetaMap.put(
-                                  // TODO: the progress index should be passed 
from the leader
-                                  // correctly
-                                  dataRegionGroupId, new PipeTaskMeta(0, 
newDataRegionLeader));
+                                  dataRegionGroupId,
+                                  new PipeTaskMeta(
+                                      new MinimumProgressIndex(), 
newDataRegionLeader));
                             } else {
                               LOGGER.warn(
                                   "The pipe task meta does not contain the 
data region group {} or the data region group has already been removed",
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index db04d909f7d..b63e157f096 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -117,7 +117,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
           
pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
       for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> 
runtimeMetaOnConfigNode :
           pipeTaskMetaMapOnConfigNode.entrySet()) {
-        if (runtimeMetaOnConfigNode.getValue().getRegionLeader() != 
dataNodeId) {
+        if (runtimeMetaOnConfigNode.getValue().getLeaderDataNodeId() != 
dataNodeId) {
           continue;
         }
 
@@ -132,11 +132,13 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
         }
 
         // update progress index
-        if (runtimeMetaOnConfigNode.getValue().getProgressIndex()
-            < runtimeMetaFromDataNode.getProgressIndex()) {
+        if (!runtimeMetaOnConfigNode
+            .getValue()
+            .getProgressIndex()
+            .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
           runtimeMetaOnConfigNode
               .getValue()
-              .setProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+              .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
           needWriteConsensusOnConfigNodes = true;
         }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index e0de835fb07..b7ab5ef62d5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -102,9 +103,9 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         .getLoadManager()
         .getRegionLeaderMap()
         .forEach(
-            (region, leader) ->
-                // TODO: make index configurable
-                consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, 
leader)));
+            (regionGroup, regionLeaderNodeId) ->
+                consensusGroupIdToTaskMetaMap.put(
+                    regionGroup, new PipeTaskMeta(new MinimumProgressIndex(), 
regionLeaderNodeId)));
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
   }
 
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 9a83e00acd7..9c3be2ac3a1 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
 import org.apache.iotdb.common.rpc.thrift.ThrottleType;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
@@ -1050,7 +1051,7 @@ public class ConfigPhysicalPlanSerDeTest {
     collectorAttributes.put("collector", 
"org.apache.iotdb.pipe.collector.DefaultCollector");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
-    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(), 
1);
     Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
     pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -1163,10 +1164,12 @@ public class ConfigPhysicalPlanSerDeTest {
               {
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
-                    new PipeTaskMeta(789, 987));
+                    new PipeTaskMeta(
+                        new MinimumProgressIndex(), 987)); // TODO: replace 
with IoTConsensus
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
-                    new PipeTaskMeta(456, 789));
+                    new PipeTaskMeta(
+                        new MinimumProgressIndex(), 789)); // TODO: replace 
with IoTConsensus
               }
             });
     pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 0246759c98c..51b771026c1 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.persistence;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -72,7 +73,7 @@ public class PipeInfoTest {
     collectorAttributes.put("collector", 
"org.apache.iotdb.pipe.collector.DefaultCollector");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
-    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumProgressIndex(), 
1);
     Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
     pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
index 07d74e2d9ff..38783ea2e53 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -68,10 +69,12 @@ public class PipeHandleMetaChangeProcedureTest {
               {
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
-                    new PipeTaskMeta(789, 987));
+                    new PipeTaskMeta(
+                        new MinimumProgressIndex(), 987)); // TODO: replace 
with IoTConsensus
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
-                    new PipeTaskMeta(456, 789));
+                    new PipeTaskMeta(
+                        new MinimumProgressIndex(), 789)); // TODO: replace 
with IoTConsensus
               }
             });
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1cb06ba4d3f..8c8badf4be6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.consensus.IStateMachine;
@@ -646,6 +648,11 @@ public class IoTConsensusServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
+    if (request instanceof ComparableConsensusRequest) {
+      final IoTProgressIndex iotProgressIndex = new IoTProgressIndex();
+      iotProgressIndex.addSearchIndex(thisNode.getNodeId(), searchIndex.get() 
+ 1);
+      ((ComparableConsensusRequest) 
request).setProgressIndex(iotProgressIndex);
+    }
     return new IndexedConsensusRequest(searchIndex.get() + 1, 
Collections.singletonList(request));
   }
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
new file mode 100644
index 00000000000..ff08623df38
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+public interface ComparableConsensusRequest {
+  ProgressIndex getProgressIndex();
+
+  void setProgressIndex(ProgressIndex progressIndex);
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
new file mode 100644
index 00000000000..b4a1f6aeca7
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface ProgressIndex {
+
+  /** serialize this progress index to the given byte buffer */
+  void serialize(ByteBuffer byteBuffer);
+
+  /** serialize this progress index to the given output stream */
+  void serialize(OutputStream stream) throws IOException;
+
+  /**
+   * A.isAfter(B) is true if and only if A is strictly greater than B.
+   *
+   * @param progressIndex the progress index to be compared
+   * @return true if and only if this progress index is strictly greater than 
the given consensus
+   *     index
+   */
+  boolean isAfter(ProgressIndex progressIndex);
+
+  /**
+   * A.equals(B) is true if and only if A is equal to B
+   *
+   * @param progressIndex the progress index to be compared
+   * @return true if and only if this progress index is equal to the given 
progress index
+   */
+  boolean equals(ProgressIndex progressIndex);
+
+  /**
+   * A.equals(B) is true if and only if A is equal to B
+   *
+   * @param obj the object to be compared
+   * @return true if and only if this progress index is equal to the given 
object
+   */
+  @Override
+  boolean equals(Object obj);
+
+  /**
+   * C = A.updateToMinimumIsAfterProgressIndex(B) where C should satisfy:
+   *
+   * <p>(C.equals(A) || C.isAfter(A)) is true
+   *
+   * <p>(C.equals(B) || C.isAfter(B)) is true
+   *
+   * <p>There is no D, such that D satisfies the above conditions and 
C.isAfter(D) is true
+   *
+   * <p>The implementation of this function should be reflexive, that is
+   * 
A.updateToMinimumIsAfterProgressIndex(B).equals(B.updateToMinimumIsAfterProgressIndex(A))
 is
+   * true
+   *
+   * <p>Note: this function may modify the caller.
+   *
+   * @param progressIndex the progress index to be compared
+   * @return the minimum progress index after the given progress index and 
this progress index
+   */
+  ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex);
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
new file mode 100644
index 00000000000..37d83a52efb
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndexType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public enum ProgressIndexType {
+  MINIMUM_CONSENSUS_INDEX((short) 1),
+  IOT_CONSENSUS_INDEX((short) 2),
+  ;
+
+  private final short type;
+
+  ProgressIndexType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(type, byteBuffer);
+  }
+
+  public void serialize(OutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(type, stream);
+  }
+
+  public static ProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final short indexType = ReadWriteIOUtils.readShort(byteBuffer);
+    switch (indexType) {
+      case 1:
+        return MinimumProgressIndex.deserializeFrom(byteBuffer);
+      case 2:
+        return IoTProgressIndex.deserializeFrom(byteBuffer);
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported progress index type %s.", indexType));
+    }
+  }
+
+  public static ProgressIndex deserializeFrom(InputStream stream) throws 
IOException {
+    final short indexType = ReadWriteIOUtils.readShort(stream);
+    switch (indexType) {
+      case 1:
+        return MinimumProgressIndex.deserializeFrom(stream);
+      case 2:
+        return IoTProgressIndex.deserializeFrom(stream);
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported progress index type %s.", indexType));
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
new file mode 100644
index 00000000000..d84ef20394f
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTProgressIndex implements ProgressIndex {
+
+  private final Map<Integer, Long> peerId2SearchIndex;
+
+  public IoTProgressIndex() {
+    peerId2SearchIndex = new HashMap<>();
+  }
+
+  public void addSearchIndex(Integer peerId, Long searchIndex) {
+    peerId2SearchIndex.put(peerId, searchIndex);
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(byteBuffer);
+
+    ReadWriteIOUtils.write(peerId2SearchIndex.size(), byteBuffer);
+    for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet()) 
{
+      ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+    }
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    ProgressIndexType.IOT_CONSENSUS_INDEX.serialize(stream);
+
+    ReadWriteIOUtils.write(peerId2SearchIndex.size(), stream);
+    for (final Map.Entry<Integer, Long> entry : peerId2SearchIndex.entrySet()) 
{
+      ReadWriteIOUtils.write(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+  }
+
+  @Override
+  public boolean isAfter(ProgressIndex progressIndex) {
+    if (progressIndex instanceof MinimumProgressIndex) {
+      return true;
+    }
+
+    if (!(progressIndex instanceof IoTProgressIndex)) {
+      return false;
+    }
+
+    final IoTProgressIndex thisIoTProgressIndex = this;
+    final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
+    return thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+        .noneMatch(
+            entry ->
+                
!thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+                    || 
thisIoTProgressIndex.peerId2SearchIndex.get(entry.getKey())
+                        <= entry.getValue());
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    if (!(progressIndex instanceof IoTProgressIndex)) {
+      return false;
+    }
+
+    final IoTProgressIndex thisIoTProgressIndex = this;
+    final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
+    return thisIoTProgressIndex.peerId2SearchIndex.size()
+            == thatIoTProgressIndex.peerId2SearchIndex.size()
+        && thatIoTProgressIndex.peerId2SearchIndex.entrySet().stream()
+            .allMatch(
+                entry ->
+                    
thisIoTProgressIndex.peerId2SearchIndex.containsKey(entry.getKey())
+                        && thisIoTProgressIndex
+                            .peerId2SearchIndex
+                            .get(entry.getKey())
+                            .equals(entry.getValue()));
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof IoTProgressIndex)) {
+      return false;
+    }
+    return this.equals((IoTProgressIndex) obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
+    if (!(progressIndex instanceof IoTProgressIndex)) {
+      return this;
+    }
+
+    final IoTProgressIndex thisIoTProgressIndex = this;
+    final IoTProgressIndex thatIoTProgressIndex = (IoTProgressIndex) 
progressIndex;
+    thatIoTProgressIndex.peerId2SearchIndex.forEach(
+        (thatK, thatV) ->
+            thisIoTProgressIndex.peerId2SearchIndex.compute(
+                thatK, (thisK, thisV) -> (thisV == null ? thatV : 
Math.max(thisV, thatV))));
+    return this;
+  }
+
+  public static IoTProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
+    for (int i = 0; i < size; i++) {
+      final int peerId = ReadWriteIOUtils.readInt(byteBuffer);
+      final long searchIndex = ReadWriteIOUtils.readLong(byteBuffer);
+      ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+    }
+    return ioTProgressIndex;
+  }
+
+  public static IoTProgressIndex deserializeFrom(InputStream stream) throws 
IOException {
+    final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
+    final int size = ReadWriteIOUtils.readInt(stream);
+    for (int i = 0; i < size; i++) {
+      final int peerId = ReadWriteIOUtils.readInt(stream);
+      final long searchIndex = ReadWriteIOUtils.readLong(stream);
+      ioTProgressIndex.addSearchIndex(peerId, searchIndex);
+    }
+    return ioTProgressIndex;
+  }
+
+  @Override
+  public String toString() {
+    return "IoTProgressIndex{" + "peerId2SearchIndex=" + peerId2SearchIndex + 
'}';
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
new file mode 100644
index 00000000000..e36b990eae4
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class MinimumProgressIndex implements ProgressIndex {
+
+  public MinimumProgressIndex() {}
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    ProgressIndexType.MINIMUM_CONSENSUS_INDEX.serialize(stream);
+  }
+
+  @Override
+  public boolean isAfter(ProgressIndex progressIndex) {
+    return false;
+  }
+
+  @Override
+  public boolean equals(ProgressIndex progressIndex) {
+    return progressIndex instanceof MinimumProgressIndex;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    return getClass() == obj.getClass();
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
+    return progressIndex == null ? this : progressIndex;
+  }
+
+  public static MinimumProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+    return new MinimumProgressIndex();
+  }
+
+  public static MinimumProgressIndex deserializeFrom(InputStream stream) {
+    return new MinimumProgressIndex();
+  }
+
+  @Override
+  public String toString() {
+    return "MinimumProgressIndex{}";
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index bb5149fb1f2..d4f43d15475 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.pipe.task.meta;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -34,28 +36,33 @@ import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeTaskMeta {
 
-  // TODO: replace it with consensus index
-  private final AtomicLong progressIndex = new AtomicLong(0L);
-  private final AtomicInteger regionLeader = new AtomicInteger(0);
+  private final AtomicReference<ProgressIndex> progressIndex = new 
AtomicReference<>();
+  private final AtomicInteger leaderDataNodeId = new AtomicInteger(0);
   private final Queue<PipeRuntimeException> exceptionMessages = new 
ConcurrentLinkedQueue<>();
 
-  private PipeTaskMeta() {}
-
-  public PipeTaskMeta(long progressIndex, int regionLeader) {
+  public PipeTaskMeta(/* @NotNull */ ProgressIndex progressIndex, int 
leaderDataNodeId) {
     this.progressIndex.set(progressIndex);
-    this.regionLeader.set(regionLeader);
+    this.leaderDataNodeId.set(leaderDataNodeId);
   }
 
-  public long getProgressIndex() {
+  public ProgressIndex getProgressIndex() {
     return progressIndex.get();
   }
 
-  public int getRegionLeader() {
-    return regionLeader.get();
+  public void updateProgressIndex(ProgressIndex updateIndex) {
+    progressIndex.updateAndGet(index -> 
index.updateToMinimumIsAfterProgressIndex(updateIndex));
+  }
+
+  public int getLeaderDataNodeId() {
+    return leaderDataNodeId.get();
+  }
+
+  public void setLeaderDataNodeId(int leaderDataNodeId) {
+    this.leaderDataNodeId.set(leaderDataNodeId);
   }
 
   public Iterable<PipeRuntimeException> getExceptionMessages() {
@@ -70,17 +77,9 @@ public class PipeTaskMeta {
     exceptionMessages.clear();
   }
 
-  public void setProgressIndex(long progressIndex) {
-    this.progressIndex.set(progressIndex);
-  }
-
-  public void setRegionLeader(int regionLeader) {
-    this.regionLeader.set(regionLeader);
-  }
-
   public void serialize(DataOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(progressIndex.get(), outputStream);
-    ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+    progressIndex.get().serialize(outputStream);
+    ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
     for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
       ReadWriteIOUtils.write(
@@ -90,8 +89,8 @@ public class PipeTaskMeta {
   }
 
   public void serialize(FileOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(progressIndex.get(), outputStream);
-    ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+    progressIndex.get().serialize(outputStream);
+    ReadWriteIOUtils.write(leaderDataNodeId.get(), outputStream);
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
     for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
       ReadWriteIOUtils.write(
@@ -101,9 +100,9 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
-    final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
-    PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(byteBuffer));
-    PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(byteBuffer));
+    final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(byteBuffer);
+    final int leaderDataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+    final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderDataNodeId);
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final boolean critical = ReadWriteIOUtils.readBool(byteBuffer);
@@ -117,9 +116,9 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(InputStream inputStream) throws 
IOException {
-    final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
-    PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(inputStream));
-    PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(inputStream));
+    final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(inputStream);
+    final int leaderDataNodeId = ReadWriteIOUtils.readInt(inputStream);
+    final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderDataNodeId);
     final int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final boolean critical = ReadWriteIOUtils.readBool(inputStream);
@@ -141,14 +140,14 @@ public class PipeTaskMeta {
       return false;
     }
     PipeTaskMeta that = (PipeTaskMeta) obj;
-    return progressIndex.get() == that.progressIndex.get()
-        && regionLeader.get() == that.regionLeader.get()
+    return progressIndex.get().equals(that.progressIndex.get())
+        && leaderDataNodeId.get() == that.leaderDataNodeId.get()
         && Arrays.equals(exceptionMessages.toArray(), 
that.exceptionMessages.toArray());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(progressIndex, regionLeader, exceptionMessages);
+    return Objects.hash(progressIndex, leaderDataNodeId, exceptionMessages);
   }
 
   @Override
@@ -157,8 +156,8 @@ public class PipeTaskMeta {
         + "progressIndex='"
         + progressIndex
         + '\''
-        + ", regionLeader='"
-        + regionLeader
+        + ", leaderDataNodeId='"
+        + leaderDataNodeId
         + '\''
         + ", exceptionMessages="
         + exceptionMessages
diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
 
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
index 08e7f86007c..b9fc0ef7e5d 100644
--- 
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
+++ 
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.task.meta;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -59,10 +60,12 @@ public class PipeMetaDeSerTest {
               {
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
-                    new PipeTaskMeta(789, 987));
+                    new PipeTaskMeta(
+                        new MinimumProgressIndex(), 987)); // TODO: replace 
with IoTProgressIndex;
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
-                    new PipeTaskMeta(456, 789));
+                    new PipeTaskMeta(
+                        new MinimumProgressIndex(), 789)); // TODO: replace 
with IoTProgressIndex;
               }
             });
     ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
index 26d52eed22f..8fa8cb7c12b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import 
org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
@@ -84,7 +86,14 @@ public class IoTConsensusDataRegionStateMachine extends 
DataRegionStateMachine {
               batchRequest.getEndSyncIndex(),
               batchRequest.getRequests().size());
       for (IndexedConsensusRequest indexedRequest : 
batchRequest.getRequests()) {
-        deserializedRequest.add(grabInsertNode(indexedRequest));
+        final PlanNode planNode = grabInsertNode(indexedRequest);
+        if (planNode instanceof ComparableConsensusRequest) {
+          final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
+          ioTProgressIndex.addSearchIndex(
+              Integer.parseInt(batchRequest.getSourcePeerId()), 
indexedRequest.getSearchIndex());
+          ((ComparableConsensusRequest) 
planNode).setProgressIndex(ioTProgressIndex);
+        }
+        deserializedRequest.add(planNode);
       }
       result = deserializedRequest;
     } else {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f89f28f9b2f..a124a8aa2c8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -301,6 +301,9 @@ public class TsFileProcessor {
       tsFileResource.updateEndTime(
           insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
     }
+
+    tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
+
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() 
- startTime);
   }
 
@@ -413,6 +416,9 @@ public class TsFileProcessor {
       tsFileResource.updateEndTime(
           insertTabletNode.getDeviceID().toStringID(), 
insertTabletNode.getTimes()[end - 1]);
     }
+
+    tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());
+
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() 
- startTime);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 0e5949f41b4..774106dd593 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -153,6 +156,8 @@ public class TsFileResource {
    */
   private TsFileResource originTsFileResource;
 
+  private ProgressIndex maxProgressIndex;
+
   public TsFileResource() {}
 
   public TsFileResource(TsFileResource other) throws IOException {
@@ -170,6 +175,7 @@ public class TsFileResource {
     this.minPlanIndex = other.minPlanIndex;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.tsFileSize = other.tsFileSize;
+    this.maxProgressIndex = other.maxProgressIndex;
   }
 
   /** for sealed TsFile, call setClosed to close TsFileResource */
@@ -242,8 +248,27 @@ public class TsFileResource {
       if (modFile != null && modFile.exists()) {
         String modFileName = new File(modFile.getFilePath()).getName();
         ReadWriteIOUtils.write(modFileName, outputStream);
+      } else {
+        // make the first "inputStream.available() > 0" in deserialize() happy.
+        //
+        // if modFile not exist, write null (-1). the first 
"inputStream.available() > 0" in
+        // deserialize() and deserializeFromOldFile() detect -1 and 
deserialize modFileName as null
+        // and skip the modFile deserialize.
+        //
+        // this make sure the first and the second "inputStream.available() > 
0" in deserialize()
+        // will always be called... which is a bit ugly but allows the 
following variable
+        // maxProgressIndex to be deserialized correctly.
+        ReadWriteIOUtils.write((String) null, outputStream);
+      }
+
+      if (maxProgressIndex != null) {
+        ReadWriteIOUtils.write(true, outputStream);
+        maxProgressIndex.serialize(outputStream);
+      } else {
+        ReadWriteIOUtils.write(false, outputStream);
       }
     }
+
     File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
     File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
     fsFactory.deleteIfExists(dest);
@@ -258,6 +283,7 @@ public class TsFileResource {
       timeIndex = ITimeIndex.createTimeIndex(inputStream);
       maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
       minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
+
       if (inputStream.available() > 0) {
         String modFileName = ReadWriteIOUtils.readString(inputStream);
         if (modFileName != null) {
@@ -265,6 +291,13 @@ public class TsFileResource {
           modFile = new ModificationFile(modF.getPath());
         }
       }
+
+      if (inputStream.available() > 0) {
+        final boolean hasMaxProgressIndex = 
ReadWriteIOUtils.readBool(inputStream);
+        if (hasMaxProgressIndex) {
+          maxProgressIndex = ProgressIndexType.deserializeFrom(inputStream);
+        }
+      }
     }
 
     // upgrade from v0.12 to v0.13, we need to rewrite the TsFileResource if 
the previous time index
@@ -310,6 +343,12 @@ public class TsFileResource {
           modFile = new ModificationFile(modF.getPath());
         }
       }
+      if (inputStream.available() > 0) {
+        final boolean hasMaxProgressIndex = 
ReadWriteIOUtils.readBool(inputStream);
+        if (hasMaxProgressIndex) {
+          maxProgressIndex = ProgressIndexType.deserializeFrom(inputStream);
+        }
+      }
     }
   }
 
@@ -1122,4 +1161,23 @@ public class TsFileResource {
   public boolean isFileInList() {
     return prev != null || next != null;
   }
+
+  public void updateProgressIndex(ProgressIndex progressIndex) {
+    if (progressIndex == null) {
+      return;
+    }
+
+    maxProgressIndex =
+        (maxProgressIndex == null
+            ? progressIndex
+            : 
maxProgressIndex.updateToMinimumIsAfterProgressIndex(progressIndex));
+  }
+
+  public ProgressIndex getMaxProgressIndexAfterClose() throws 
IllegalStateException {
+    if (status.equals(TsFileResourceStatus.UNCLOSED)) {
+      throw new IllegalStateException(
+          "Should not get progress index from a unclosing TsFileResource.");
+    }
+    return maxProgressIndex == null ? new MinimumProgressIndex() : 
maxProgressIndex;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 5a4e198b77c..73308092ecb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -263,4 +264,10 @@ public class InsertMultiTabletsNode extends InsertNode {
   public long getMinTime() {
     throw new NotImplementedException();
   }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    this.progressIndex = progressIndex;
+    insertTabletNodeList.forEach(node -> node.setProgressIndex(progressIndex));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index c191f03837a..df0a01c6271 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
@@ -40,7 +42,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
-public abstract class InsertNode extends WritePlanNode {
+public abstract class InsertNode extends WritePlanNode implements 
ComparableConsensusRequest {
 
   /** this insert node doesn't need to participate in iot consensus */
   public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
@@ -73,6 +75,8 @@ public abstract class InsertNode extends WritePlanNode {
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
+  protected ProgressIndex progressIndex;
+
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
@@ -267,6 +271,20 @@ public abstract class InsertNode extends WritePlanNode {
   }
   // endregion
 
+  // region progress index
+
+  @Override
+  public final ProgressIndex getProgressIndex() {
+    return progressIndex;
+  }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    this.progressIndex = progressIndex;
+  }
+
+  // endregion
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 050839f6428..5fcaf1b44ac 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -240,4 +241,10 @@ public class InsertRowsNode extends InsertNode {
   public long getMinTime() {
     throw new NotImplementedException();
   }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    this.progressIndex = progressIndex;
+    insertRowNodeList.forEach(insertRowNode -> 
insertRowNode.setProgressIndex(progressIndex));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index dd788ea4db0..5cdbf43b23c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -292,4 +293,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   public long getMinTime() {
     throw new NotImplementedException();
   }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    this.progressIndex = progressIndex;
+    insertRowNodeList.forEach(insertRowNode -> 
insertRowNode.setProgressIndex(progressIndex));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 2ff33b450c7..10e8679ac83 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.PipeBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTask;
@@ -65,6 +67,7 @@ import java.util.stream.Collectors;
 public class PipeTaskAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskAgent.class);
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private final PipeMetaKeeper pipeMetaKeeper;
   private final PipeTaskManager pipeTaskManager;
@@ -76,7 +79,6 @@ public class PipeTaskAgent {
 
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
-  // TODO: handle progress index
   public synchronized void handlePipeMetaChanges(List<PipeMeta> 
pipeMetaListFromConfigNode) {
     // do nothing if data node is removing or removed
     if (PipeAgent.runtime().isShutdown()) {
@@ -156,11 +158,7 @@ public class PipeTaskAgent {
 
       // if task meta does not exist on data node, create a new task
       if (taskMetaOnDataNode == null) {
-        createPipeTask(
-            consensusGroupIdFromConfigNode,
-            pipeStaticMeta,
-            taskMetaFromConfigNode.getProgressIndex(),
-            taskMetaFromConfigNode.getRegionLeader());
+        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, 
taskMetaFromConfigNode);
         // we keep the new created task's status consistent with the status 
recorded in data node's
         // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
         // is not reliable, but we will have a check later to make sure the 
status is correct.
@@ -171,16 +169,12 @@ public class PipeTaskAgent {
       }
 
       // if task meta exists on data node, check if it has changed
-      final int regionLeaderFromConfigNode = 
taskMetaFromConfigNode.getRegionLeader();
-      final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
+      final int dataNodeIdFromConfigNode = 
taskMetaFromConfigNode.getLeaderDataNodeId();
+      final int dataNodeIdOnDataNode = 
taskMetaOnDataNode.getLeaderDataNodeId();
 
-      if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
+      if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
         dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
-        createPipeTask(
-            consensusGroupIdFromConfigNode,
-            pipeStaticMeta,
-            taskMetaFromConfigNode.getProgressIndex(),
-            taskMetaFromConfigNode.getRegionLeader());
+        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, 
taskMetaFromConfigNode);
         // we keep the new created task's status consistent with the status 
recorded in data node's
         // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
         // is not reliable, but we will have a check later to make sure the 
status is correct.
@@ -511,19 +505,22 @@ public class PipeTaskAgent {
   ///////////////////////// Manage by dataRegionGroupId 
/////////////////////////
 
   private void createPipeTask(
-      TConsensusGroupId dataRegionGroupId,
+      TConsensusGroupId consensusGroupId,
       PipeStaticMeta pipeStaticMeta,
-      long progressIndex,
-      int dataRegionId) {
-    final PipeTask pipeTask =
-        new PipeTaskBuilder(Integer.toString(dataRegionId), 
pipeStaticMeta).build();
-    pipeTask.create();
-    pipeTaskManager.addPipeTask(pipeStaticMeta, dataRegionGroupId, pipeTask);
+      PipeTaskMeta pipeTaskMeta) {
+    if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
+      final PipeTask pipeTask =
+          new PipeTaskBuilder(consensusGroupId, pipeTaskMeta, 
pipeStaticMeta).build();
+      pipeTask.create();
+      pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
+    }
     pipeMetaKeeper
         .getPipeMeta(pipeStaticMeta.getPipeName())
         .getRuntimeMeta()
         .getConsensusGroupIdToTaskMetaMap()
-        .put(dataRegionGroupId, new PipeTaskMeta(progressIndex, dataRegionId));
+        .put(
+            consensusGroupId,
+            new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), 
pipeTaskMeta.getLeaderDataNodeId()));
   }
 
   private void dropPipeTask(TConsensusGroupId dataRegionGroupId, 
PipeStaticMeta pipeStaticMeta) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index fe1c92638be..fa7b07c182b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.core.collector;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import 
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
@@ -44,10 +45,12 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   private int dataRegionId;
 
-  public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> 
collectorPendingQueue) {
+  public IoTDBDataRegionCollector(
+      PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event> 
collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
-    realtimeCollector = new 
PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
-    historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
+    realtimeCollector =
+        new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, 
collectorPendingQueue);
+    historicalCollector = new 
PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index ed0d37c6c14..9e796bddc77 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.core.collector.historical;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
@@ -37,10 +39,18 @@ import java.util.stream.Collectors;
 
 public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
 
+  private final PipeTaskMeta pipeTaskMeta;
+  private final ProgressIndex startIndex;
+
   private int dataRegionId;
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
+  public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
+    this.pipeTaskMeta = pipeTaskMeta;
+    this.startIndex = pipeTaskMeta.getProgressIndex();
+  }
+
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
@@ -71,16 +81,18 @@ public class PipeHistoricalDataRegionTsFileCollector 
implements PipeCollector {
         pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + 
tsFileManager.size(false));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(true).stream()
-                .map(PipeTsFileInsertionEvent::new)
+                .filter(resource -> 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
-                .map(PipeTsFileInsertionEvent::new)
+                .filter(resource -> 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
-            event ->
-                event.increaseReferenceCount(
-                    PipeHistoricalDataRegionTsFileCollector.class.getName()));
+            event -> {
+              
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileCollector.class.getName());
+            });
       } finally {
         tsFileManager.readUnlock();
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 41e9f5d76fb..fd790b718cd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -29,9 +30,15 @@ import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfig
 
 public abstract class PipeRealtimeDataRegionCollector implements PipeCollector 
{
 
+  protected final PipeTaskMeta pipeTaskMeta;
+
   protected String pattern;
   protected String dataRegionId;
 
+  public PipeRealtimeDataRegionCollector(PipeTaskMeta pipeTaskMeta) {
+    this.pipeTaskMeta = pipeTaskMeta;
+  }
+
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
@@ -64,6 +71,10 @@ public abstract class PipeRealtimeDataRegionCollector 
implements PipeCollector {
     return pattern;
   }
 
+  public final PipeTaskMeta getPipeTaskMeta() {
+    return pipeTaskMeta;
+  }
+
   @Override
   public String toString() {
     return "PipeRealtimeDataRegionCollector{"
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 898b93cb22d..4ec7ed92261 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -44,7 +45,8 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
   private final ListenableUnblockingPendingQueue<Event> pendingQueue;
 
   public PipeRealtimeDataRegionHybridCollector(
-      ListenableUnblockingPendingQueue<Event> pendingQueue) {
+      PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event> 
pendingQueue) {
+    super(pipeTaskMeta);
     this.pendingQueue = pendingQueue;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 2d805310f30..3e264ce02e9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -53,8 +53,11 @@ public class PipeDataRegionAssigner {
         .match(event)
         .forEach(
             collector -> {
-              
event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
-              collector.collect(event);
+              final PipeRealtimeCollectEvent copiedEvent =
+                  event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+                      collector.getPipeTaskMeta());
+              
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+              collector.collect(copiedEvent);
             });
     event.gcSchemaInfo();
     event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index fa9e765ca25..fc15200b603 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -19,20 +19,73 @@
 
 package org.apache.iotdb.db.pipe.core.event;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * EnrichedEvent is an event that can be enriched with additional runtime 
information. The
  * additional information mainly includes the reference count of the event.
  */
-public interface EnrichedEvent {
+public abstract class EnrichedEvent implements Event {
+
+  private final AtomicInteger referenceCount;
+
+  private final PipeTaskMeta pipeTaskMeta;
+
+  public EnrichedEvent(PipeTaskMeta pipeTaskMeta) {
+    referenceCount = new AtomicInteger(0);
+    this.pipeTaskMeta = pipeTaskMeta;
+  }
+
+  /**
+   * increase the reference count of this event. when the reference count is 
positive, the data in
+   * the resource of this event should be safe to use.
+   *
+   * @param holderMessage the message of the invoker
+   * @return true if the reference count is increased successfully, false if 
the event is not
+   */
+  public final boolean increaseReferenceCount(String holderMessage) {
+    boolean isSuccessful = true;
+    synchronized (this) {
+      if (referenceCount.get() == 0) {
+        isSuccessful = increaseResourceReferenceCount(holderMessage);
+      }
+      referenceCount.incrementAndGet();
+    }
+    return isSuccessful;
+  }
 
   /**
-   * Increase the reference count of this event.
+   * Increase the reference count of the resource of this event.
    *
    * @param holderMessage the message of the invoker
    * @return true if the reference count is increased successfully, false if 
the event is not
    *     controlled by the invoker, which means the data stored in the event 
is not safe to use
    */
-  boolean increaseReferenceCount(String holderMessage);
+  public abstract boolean increaseResourceReferenceCount(String holderMessage);
+
+  /**
+   * Decrease the reference count of this event. If the reference count is 
decreased to 0, the event
+   * can be recycled and the data stored in the event is not safe to use, the 
processing progress of
+   * the event should be reported to the pipe task meta.
+   *
+   * @param holderMessage the message of the invoker
+   * @return true if the reference count is decreased successfully, false 
otherwise
+   */
+  public final boolean decreaseReferenceCount(String holderMessage) {
+    boolean isSuccessful = true;
+    synchronized (this) {
+      if (referenceCount.get() == 1) {
+        isSuccessful = decreaseResourceReferenceCount(holderMessage);
+        reportProgress();
+      }
+      referenceCount.decrementAndGet();
+    }
+    return isSuccessful;
+  }
 
   /**
    * Decrease the reference count of this event. If the reference count is 
decreased to 0, the event
@@ -41,14 +94,25 @@ public interface EnrichedEvent {
    * @param holderMessage the message of the invoker
    * @return true if the reference count is decreased successfully, false 
otherwise
    */
-  boolean decreaseReferenceCount(String holderMessage);
+  public abstract boolean decreaseResourceReferenceCount(String holderMessage);
+
+  private void reportProgress() {
+    if (pipeTaskMeta != null) {
+      pipeTaskMeta.updateProgressIndex(getProgressIndex());
+    }
+  }
+
+  public abstract ProgressIndex getProgressIndex();
 
   /**
    * Get the reference count of this event.
    *
    * @return the reference count
    */
-  int getReferenceCount();
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
 
-  // TODO: ConsensusIndex getConsensusIndex();
+  public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+      PipeTaskMeta pipeTaskMeta);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 99d9da055ee..fb9788d1a95 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.core.event.impl;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -35,37 +37,32 @@ import org.slf4j.LoggerFactory;
 import java.util.Iterator;
 import java.util.function.BiConsumer;
 
-public class PipeTabletInsertionEvent implements TabletInsertionEvent, 
EnrichedEvent {
+public class PipeTabletInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
 
   private final WALEntryHandler walEntryHandler;
+  private final ProgressIndex progressIndex;
 
-  public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler) {
+  public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler, 
ProgressIndex progressIndex) {
+    this(walEntryHandler, progressIndex, null);
+  }
+
+  private PipeTabletInsertionEvent(
+      WALEntryHandler walEntryHandler, ProgressIndex progressIndex, 
PipeTaskMeta pipeTaskMeta) {
+    super(pipeTaskMeta);
     this.walEntryHandler = walEntryHandler;
+    this.progressIndex = progressIndex;
   }
 
   public InsertNode getInsertNode() throws WALPipeException {
     return walEntryHandler.getValue();
   }
 
-  @Override
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  @Override
-  public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, 
RowCollector> consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
+  /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
-  public boolean increaseReferenceCount(String holderMessage) {
+  public boolean increaseResourceReferenceCount(String holderMessage) {
     try {
       PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(), 
walEntryHandler);
       return true;
@@ -80,7 +77,7 @@ public class PipeTabletInsertionEvent implements 
TabletInsertionEvent, EnrichedE
   }
 
   @Override
-  public boolean decreaseReferenceCount(String holderMessage) {
+  public boolean decreaseResourceReferenceCount(String holderMessage) {
     try {
       PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
       return true;
@@ -95,12 +92,42 @@ public class PipeTabletInsertionEvent implements 
TabletInsertionEvent, EnrichedE
   }
 
   @Override
-  public int getReferenceCount() {
-    return 
PipeResourceManager.wal().getReferenceCount(walEntryHandler.getMemTableId());
+  public ProgressIndex getProgressIndex() {
+    return progressIndex;
+  }
+
+  @Override
+  public PipeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+      PipeTaskMeta pipeTaskMeta) {
+    return new PipeTabletInsertionEvent(walEntryHandler, progressIndex, 
pipeTaskMeta);
+  }
+
+  /////////////////////////// TabletInsertionEvent ///////////////////////////
+
+  @Override
+  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  @Override
+  public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, 
RowCollector> consumer) {
+    throw new UnsupportedOperationException("Not implemented yet");
   }
 
+  @Override
+  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  /////////////////////////// Object ///////////////////////////
+
   @Override
   public String toString() {
-    return "PipeTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler 
+ '}';
+    return "PipeTabletInsertionEvent{"
+        + "walEntryHandler="
+        + walEntryHandler
+        + ", progressIndex="
+        + progressIndex
+        + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 65f78cebac2..4b346b0f915 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.pipe.core.event.impl;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
@@ -32,14 +35,23 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent, 
EnrichedEvent {
+public class PipeTsFileInsertionEvent extends EnrichedEvent implements 
TsFileInsertionEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
 
+  private final TsFileResource resource;
   private File tsFile;
+
   private final AtomicBoolean isClosed;
 
   public PipeTsFileInsertionEvent(TsFileResource resource) {
+    this(resource, null);
+  }
+
+  public PipeTsFileInsertionEvent(TsFileResource resource, PipeTaskMeta 
pipeTaskMeta) {
+    super(pipeTaskMeta);
+
+    this.resource = resource;
     tsFile = resource.getTsFile();
 
     isClosed = new AtomicBoolean(resource.isClosed());
@@ -72,20 +84,11 @@ public class PipeTsFileInsertionEvent implements 
TsFileInsertionEvent, EnrichedE
     return tsFile;
   }
 
-  @Override
-  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  @Override
-  public TsFileInsertionEvent 
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
+  /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
-  public boolean increaseReferenceCount(String holderMessage) {
+  public boolean increaseResourceReferenceCount(String holderMessage) {
     try {
-      // TODO: increase reference count for mods & resource files
       tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
       return true;
     } catch (Exception e) {
@@ -99,7 +102,7 @@ public class PipeTsFileInsertionEvent implements 
TsFileInsertionEvent, EnrichedE
   }
 
   @Override
-  public boolean decreaseReferenceCount(String holderMessage) {
+  public boolean decreaseResourceReferenceCount(String holderMessage) {
     try {
       PipeResourceManager.file().decreaseFileReference(tsFile);
       return true;
@@ -114,12 +117,48 @@ public class PipeTsFileInsertionEvent implements 
TsFileInsertionEvent, EnrichedE
   }
 
   @Override
-  public int getReferenceCount() {
-    return PipeResourceManager.file().getFileReferenceCount(tsFile);
+  public ProgressIndex getProgressIndex() {
+    try {
+      waitForTsFileClose();
+      return resource.getMaxProgressIndexAfterClose();
+    } catch (InterruptedException e) {
+      LOGGER.warn(
+          String.format(
+              "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath()));
+      Thread.currentThread().interrupt();
+      return new MinimumProgressIndex();
+    }
+  }
+
+  @Override
+  public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+      PipeTaskMeta pipeTaskMeta) {
+    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta);
+  }
+
+  /////////////////////////// TsFileInsertionEvent ///////////////////////////
+
+  @Override
+  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  @Override
+  public TsFileInsertionEvent 
toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
+    throw new UnsupportedOperationException("Not implemented yet");
   }
 
+  /////////////////////////// Object ///////////////////////////
+
   @Override
   public String toString() {
-    return "PipeTsFileInsertionEvent{" + "tsFile=" + tsFile + '}';
+    return "PipeTsFileInsertionEvent{"
+        + "resource="
+        + resource
+        + ", tsFile="
+        + tsFile
+        + ", isClosed="
+        + isClosed
+        + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 0c63131e6e9..e9887cc88bf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -19,20 +19,31 @@
 
 package org.apache.iotdb.db.pipe.core.event.realtime;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.Map;
 
-public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
+/**
+ * PipeRealtimeCollectEvent is an event that decorates the EnrichedEvent with 
the information of
+ * TsFileEpoch and schema info. It only exists in the realtime event collector.
+ */
+public class PipeRealtimeCollectEvent extends EnrichedEvent {
 
-  private final Event event;
+  private final EnrichedEvent event;
   private final TsFileEpoch tsFileEpoch;
 
   private Map<String, String[]> device2Measurements;
 
   public PipeRealtimeCollectEvent(
-      Event event, TsFileEpoch tsFileEpoch, Map<String, String[]> 
device2Measurements) {
+      EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]> 
device2Measurements) {
+    // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeCollectEvent
+    // is only used in the realtime event collector, which does not need to 
report the progress
+    // of the event, so the pipeTaskMeta is always null.
+    super(null);
+
     this.event = event;
     this.tsFileEpoch = tsFileEpoch;
     this.device2Measurements = device2Measurements;
@@ -55,24 +66,38 @@ public class PipeRealtimeCollectEvent implements Event, 
EnrichedEvent {
   }
 
   @Override
-  public boolean increaseReferenceCount(String holderMessage) {
-    return !(event instanceof EnrichedEvent)
-        || ((EnrichedEvent) event).increaseReferenceCount(holderMessage);
+  public boolean increaseResourceReferenceCount(String holderMessage) {
+    return event.increaseResourceReferenceCount(holderMessage);
+  }
+
+  @Override
+  public boolean decreaseResourceReferenceCount(String holderMessage) {
+    return event.decreaseResourceReferenceCount(holderMessage);
   }
 
   @Override
-  public boolean decreaseReferenceCount(String holderMessage) {
-    return !(event instanceof EnrichedEvent)
-        || ((EnrichedEvent) event).decreaseReferenceCount(holderMessage);
+  public ProgressIndex getProgressIndex() {
+    return event.getProgressIndex();
   }
 
   @Override
-  public int getReferenceCount() {
-    return event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getReferenceCount() : 0;
+  public PipeRealtimeCollectEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+      PipeTaskMeta pipeTaskMeta) {
+    return new PipeRealtimeCollectEvent(
+        
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta),
+        this.tsFileEpoch,
+        this.device2Measurements);
   }
 
   @Override
   public String toString() {
-    return "PipeRealtimeCollectEvent{" + "event=" + event + ", tsFileEpoch=" + 
tsFileEpoch + '}';
+    return "PipeRealtimeCollectEvent{"
+        + "event="
+        + event
+        + ", tsFileEpoch="
+        + tsFileEpoch
+        + ", device2Measurements="
+        + device2Measurements
+        + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index a4b453e4322..937e3073a27 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -37,7 +37,9 @@ public class PipeRealtimeCollectEventFactory {
   public static PipeRealtimeCollectEvent createCollectEvent(
       WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
-        new PipeTabletInsertionEvent(walEntryHandler), insertNode, resource);
+        new PipeTabletInsertionEvent(walEntryHandler, 
insertNode.getProgressIndex()),
+        insertNode,
+        resource);
   }
 
   private PipeRealtimeCollectEventFactory() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index 447ae5bdf09..481eb0e5ad7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 
 import java.util.HashMap;
 import java.util.Map;
 
 public class PipeBuilder {
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private final PipeMeta pipeMeta;
 
@@ -49,16 +52,19 @@ public class PipeBuilder {
     final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
     for (Map.Entry<TConsensusGroupId, PipeTaskMeta> 
consensusGroupIdToPipeTaskMeta :
         pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
-      consensusGroupIdToPipeTaskMap.put(
-          consensusGroupIdToPipeTaskMeta.getKey(),
-          new PipeTaskBuilder(
-                  pipeName,
-                  
Integer.toString(consensusGroupIdToPipeTaskMeta.getKey().getId()),
-                  // TODO: 
consensusGroupIdToPipeTaskMeta.getValue().getProgressIndex() is not used
-                  collectorParameters,
-                  processorParameters,
-                  connectorParameters)
-              .build());
+      if (consensusGroupIdToPipeTaskMeta.getValue().getLeaderDataNodeId()
+          == CONFIG.getDataNodeId()) {
+        consensusGroupIdToPipeTaskMap.put(
+            consensusGroupIdToPipeTaskMeta.getKey(),
+            new PipeTaskBuilder(
+                    pipeName,
+                    consensusGroupIdToPipeTaskMeta.getKey(),
+                    consensusGroupIdToPipeTaskMeta.getValue(),
+                    collectorParameters,
+                    processorParameters,
+                    connectorParameters)
+                .build());
+      }
     }
 
     return consensusGroupIdToPipeTaskMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index 26475c5c4b4..887b31a6e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -19,12 +19,13 @@
 
 package org.apache.iotdb.db.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
 
 public class PipeTask {
 
   private final String pipeName;
-  private final String dataRegionId;
+  private final TConsensusGroupId dataRegionId;
 
   private final PipeTaskStage collectorStage;
   private final PipeTaskStage processorStage;
@@ -32,7 +33,7 @@ public class PipeTask {
 
   PipeTask(
       String pipeName,
-      String dataRegionId,
+      TConsensusGroupId dataRegionId,
       PipeTaskStage collectorStage,
       PipeTaskStage processorStage,
       PipeTaskStage connectorStage) {
@@ -68,7 +69,7 @@ public class PipeTask {
     connectorStage.stop();
   }
 
-  public String getDataRegionId() {
+  public TConsensusGroupId getDataRegionId() {
     return dataRegionId;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 51e429b0455..d42d62e5f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
@@ -28,28 +30,33 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 public class PipeTaskBuilder {
 
   private final String pipeName;
-  private final String dataRegionId;
+  private final TConsensusGroupId dataRegionId;
+  private final PipeTaskMeta pipeTaskMeta;
   private final PipeParameters pipeCollectorParameters;
   private final PipeParameters pipeProcessorParameters;
   private final PipeParameters pipeConnectorParameters;
 
   PipeTaskBuilder(
       String pipeName,
-      String dataRegionId,
+      TConsensusGroupId dataRegionId,
+      PipeTaskMeta pipeTaskMeta,
       PipeParameters pipeCollectorParameters,
       PipeParameters pipeProcessorParameters,
       PipeParameters pipeConnectorParameters) {
     this.pipeName = pipeName;
     this.dataRegionId = dataRegionId;
+    this.pipeTaskMeta = pipeTaskMeta;
     this.pipeCollectorParameters = pipeCollectorParameters;
     this.pipeProcessorParameters = pipeProcessorParameters;
     this.pipeConnectorParameters = pipeConnectorParameters;
   }
 
-  public PipeTaskBuilder(String dataRegionId, PipeStaticMeta pipeStaticMeta) {
+  public PipeTaskBuilder(
+      TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta, 
PipeStaticMeta pipeStaticMeta) {
     this(
         pipeStaticMeta.getPipeName(),
         dataRegionId,
+        pipeTaskMeta,
         pipeStaticMeta.getCollectorParameters(),
         pipeStaticMeta.getProcessorParameters(),
         pipeStaticMeta.getConnectorParameters());
@@ -60,7 +67,7 @@ public class PipeTaskBuilder {
 
     // we first build the collector and connector, then build the processor.
     final PipeTaskCollectorStage collectorStage =
-        new PipeTaskCollectorStage(dataRegionId, pipeCollectorParameters);
+        new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta, 
pipeCollectorParameters);
     final PipeTaskConnectorStage connectorStage =
         new PipeTaskConnectorStage(pipeConnectorParameters);
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index a4866e87e8e..3dd445c7489 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
@@ -53,7 +55,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
 
   private final PipeCollector pipeCollector;
 
-  public PipeTaskCollectorStage(String dataRegionId, PipeParameters 
collectorParameters) {
+  public PipeTaskCollectorStage(
+      TConsensusGroupId dataRegionId,
+      PipeTaskMeta pipeTaskMeta,
+      PipeParameters collectorParameters) {
     // TODO: avoid if-else, use reflection to create collector all the time
     if (collectorParameters
         .getStringOrDefault(
@@ -69,10 +74,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       // collector
       this.collectorParameters
           .getAttribute()
-          .put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
+          .put(PipeCollectorConstant.DATA_REGION_KEY, 
String.valueOf(dataRegionId.getId()));
 
       collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
-      this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
+      this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta, 
collectorPendingQueue);
     } else {
       this.collectorParameters = collectorParameters;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 6bbec4480c0..5542b943d59 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
@@ -60,7 +61,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
    */
   public PipeTaskProcessorStage(
       String pipeName,
-      String dataRegionId,
+      TConsensusGroupId dataRegionId,
       EventSupplier pipeCollectorInputEventSupplier,
       @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
new file mode 100644
index 00000000000..8a9957cf47a
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceProgressIndexTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+public class TsFileResourceProgressIndexTest {
+  private final File file =
+      new File(
+          
TsFileNameGenerator.generateNewTsFilePath(TestConstant.BASE_OUTPUT_PATH, 1, 1, 
1, 1));
+  private final TsFileResource tsFileResource = new TsFileResource(file);
+  private final Map<String, Integer> deviceToIndex = new HashMap<>();
+  private final long[] startTimes = new long[DEVICE_NUM];
+  private final long[] endTimes = new long[DEVICE_NUM];
+  private static final int DEVICE_NUM = 100;
+
+  private final List<ProgressIndex> indexList = new ArrayList<>();
+  private static final int INDEX_NUM = 1000;
+
+  @Before
+  public void setUp() {
+    IntStream.range(0, DEVICE_NUM).forEach(i -> deviceToIndex.put("root.sg.d" 
+ i, i));
+    DeviceTimeIndex deviceTimeIndex = new DeviceTimeIndex(deviceToIndex, 
startTimes, endTimes);
+    IntStream.range(0, DEVICE_NUM)
+        .forEach(
+            i -> {
+              deviceTimeIndex.updateStartTime("root.sg.d" + i, i);
+              deviceTimeIndex.updateEndTime("root.sg.d" + i, i + 1);
+            });
+    tsFileResource.setTimeIndex(deviceTimeIndex);
+    tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+    IntStream.range(0, INDEX_NUM).forEach(i -> indexList.add(new 
MockProgressIndex(i)));
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // clean fake file
+    if (file.exists()) {
+      FileUtils.delete(file);
+    }
+    File resourceFile = new File(file.getName() + 
TsFileResource.RESOURCE_SUFFIX);
+    if (resourceFile.exists()) {
+      FileUtils.delete(resourceFile);
+    }
+  }
+
+  @Test
+  public void testProgressIndexRecorder() {
+    Assert.assertTrue(
+        new 
MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+
+    indexList.forEach(tsFileResource::updateProgressIndex);
+
+    Assert.assertFalse(
+        new 
MockProgressIndex(-1).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+    Assert.assertFalse(
+        new 
MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+    Assert.assertFalse(
+        new 
MockProgressIndex(1).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+    Assert.assertFalse(
+        new MockProgressIndex(INDEX_NUM - 1)
+            .isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+
+    Assert.assertTrue(
+        new 
MockProgressIndex(INDEX_NUM).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+    Assert.assertTrue(
+        new MockProgressIndex(Integer.MAX_VALUE)
+            .isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+
+    Assert.assertFalse(
+        new MockProgressIndex(1, INDEX_NUM - 1)
+            .isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
+  }
+
+  @Test
+  public void testProgressIndexRecorderSerialize() {
+    // TODO: wait for implements of ProgressIndex.deserializeFrom
+  }
+
+  public static class MockProgressIndex implements ProgressIndex {
+    private final int type;
+    private int val;
+
+    public MockProgressIndex(int val) {
+      this(0, val);
+    }
+
+    public MockProgressIndex(int type, int val) {
+      this.type = type;
+      this.val = val;
+    }
+
+    @Override
+    public void serialize(ByteBuffer byteBuffer) {
+      ReadWriteIOUtils.write(val, byteBuffer);
+    }
+
+    @Override
+    public void serialize(OutputStream stream) throws IOException {
+      ReadWriteIOUtils.write(val, stream);
+    }
+
+    @Override
+    public boolean isAfter(ProgressIndex progressIndex) {
+      if (!(progressIndex instanceof MockProgressIndex)) {
+        return true;
+      }
+
+      MockProgressIndex that = (MockProgressIndex) progressIndex;
+      return this.type == that.type && this.val > that.val;
+    }
+
+    @Override
+    public boolean equals(ProgressIndex progressIndex) {
+      if (!(progressIndex instanceof MockProgressIndex)) {
+        return false;
+      }
+
+      MockProgressIndex that = (MockProgressIndex) progressIndex;
+      return this.type == that.type && this.val == that.val;
+    }
+
+    @Override
+    public ProgressIndex updateToMinimumIsAfterProgressIndex(ProgressIndex 
progressIndex) {
+      if (!(progressIndex instanceof MockProgressIndex)) {
+        throw new IllegalStateException("Mock update error.");
+      }
+
+      MockProgressIndex that = (MockProgressIndex) progressIndex;
+      if (that.type == this.type) {
+        this.val = Math.max(this.val, that.val);
+      }
+      return this;
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index d833e2761a0..198761eee3b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
@@ -64,7 +65,8 @@ public class CachedSchemaPatternMatcherTest {
 
   @Test
   public void testCachedMatcher() throws ExecutionException, 
InterruptedException {
-    PipeRealtimeDataRegionCollector databaseCollector = new 
PipeRealtimeDataRegionFakeCollector();
+    PipeRealtimeDataRegionCollector databaseCollector =
+        new PipeRealtimeDataRegionFakeCollector(null);
     databaseCollector.customize(
         new PipeParameters(
             new HashMap<String, String>() {
@@ -79,7 +81,8 @@ public class CachedSchemaPatternMatcherTest {
     int deviceCollectorNum = 10;
     int seriesCollectorNum = 10;
     for (int i = 0; i < deviceCollectorNum; i++) {
-      PipeRealtimeDataRegionCollector deviceCollector = new 
PipeRealtimeDataRegionFakeCollector();
+      PipeRealtimeDataRegionCollector deviceCollector =
+          new PipeRealtimeDataRegionFakeCollector(null);
       int finalI1 = i;
       deviceCollector.customize(
           new PipeParameters(
@@ -92,7 +95,8 @@ public class CachedSchemaPatternMatcherTest {
           null);
       collectorList.add(deviceCollector);
       for (int j = 0; j < seriesCollectorNum; j++) {
-        PipeRealtimeDataRegionCollector seriesCollector = new 
PipeRealtimeDataRegionFakeCollector();
+        PipeRealtimeDataRegionCollector seriesCollector =
+            new PipeRealtimeDataRegionFakeCollector(null);
         int finalI = i;
         int finalJ = j;
         seriesCollector.customize(
@@ -149,6 +153,10 @@ public class CachedSchemaPatternMatcherTest {
 
   public static class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionCollector {
 
+    public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
+      super(pipeTaskMeta);
+    }
+
     @Override
     public Event supply() {
       return null;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 45b0cf504b2..fbb399831eb 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -102,13 +102,17 @@ public class PipeRealtimeCollectTest {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>())) {
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>())) {
 
       collector1.customize(
           new PipeParameters(

Reply via email to