This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eb1e786fa15 Revert Pipe/IoTV2: Try to persist progressIndex in local 
for resend event more efficently (#15924)
eb1e786fa15 is described below

commit eb1e786fa159132ad2daec4fe960a81595573d27
Author: Caideyipi <[email protected]>
AuthorDate: Sat Jul 12 16:27:32 2025 +0800

    Revert Pipe/IoTV2: Try to persist progressIndex in local for resend event 
more efficently (#15924)
    
    * Revert
    
    * Continue revert
---
 .../runtime/heartbeat/PipeHeartbeatParser.java     |   2 -
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   7 +-
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |   4 +-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |  16 +--
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  18 +--
 .../confignode/service/ConfigNodeShutdownHook.java |   3 -
 .../request/ConfigPhysicalPlanSerDeTest.java       |  12 +-
 .../consensus/response/pipe/PipeTableRespTest.java |   6 +-
 .../agent/PipeConfigNodeSubtaskExecutorTest.java   |   3 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |   4 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  27 ----
 ...oricalDataRegionTsFileAndDeletionExtractor.java |   9 +-
 .../iotdb/db/service/DataNodeShutdownHook.java     |   3 -
 .../iotdb/commons/concurrent/ThreadName.java       |   5 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  55 --------
 .../agent/runtime/PipePeriodicalJobExecutor.java   |  33 -----
 .../commons/pipe/agent/task/PipeTaskAgent.java     |   3 -
 .../commons/pipe/agent/task/meta/PipeMeta.java     |   6 +-
 .../pipe/agent/task/meta/PipeRuntimeMeta.java      |  51 ++------
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 142 +--------------------
 .../iotdb/commons/pipe/config/PipeConfig.java      |  21 ---
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  18 ---
 .../iotdb/commons/pipe/task/PipeMetaDeSerTest.java |  22 +---
 23 files changed, 52 insertions(+), 418 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 6f6c0b2d443..e2303fecdea 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -244,7 +244,6 @@ public class PipeHeartbeatParser {
                 .equals(PipeStatus.STOPPED)) {
               PipeRuntimeMeta runtimeMeta = 
pipeMetaFromCoordinator.getRuntimeMeta();
               runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-              runtimeMeta.onSetPipeDroppedOrStopped();
               runtimeMeta.setIsStoppedByRuntimeException(true);
 
               needWriteConsensusOnConfigNodes.set(true);
@@ -274,7 +273,6 @@ public class PipeHeartbeatParser {
                               exceptionMap.put(nodeId, exception);
                             }
                             runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-                            runtimeMeta.onSetPipeDroppedOrStopped();
                             runtimeMeta.setIsStoppedByRuntimeException(true);
 
                             needWriteConsensusOnConfigNodes.set(true);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index b014013bee8..30333c66768 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -627,11 +627,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             if (newLeader != -1) {
                               consensusGroupIdToTaskMetaMap.put(
                                   consensusGroupId.getId(),
-                                  new PipeTaskMeta(
-                                      MinimumProgressIndex.INSTANCE,
-                                      newLeader,
-                                      consensusGroupId.getId(),
-                                      false));
+                                  new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
                             }
                             // else:
                             // "The pipe task meta does not contain the data 
region group {} or
@@ -805,7 +801,6 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
                   // Mark the status of the pipe with exception as stopped
                   runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-                  runtimeMeta.onSetPipeDroppedOrStopped();
                   runtimeMeta.setIsStoppedByRuntimeException(true);
 
                   final Map<Integer, PipeRuntimeException> exceptionMap =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 07ce978c16b..43b1660681e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -158,9 +158,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
                       
consensusGroupIdToTaskMetaMap.get(taskIndex).setLeaderNodeId(newLeader);
                     } else {
                       consensusGroupIdToTaskMetaMap.put(
-                          taskIndex,
-                          new PipeTaskMeta(
-                              MinimumProgressIndex.INSTANCE, newLeader, 
taskIndex, false));
+                          taskIndex, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
                     }
                   });
               final Set<Integer> taskIdToRemove =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 259b865e59f..a1305e99d28 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -167,11 +167,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           (taskId, pipeTaskMeta) -> {
             updatedConsensusGroupIdToTaskMetaMap.put(
                 taskId,
-                new PipeTaskMeta(
-                    pipeTaskMeta.getProgressIndex(),
-                    pipeTaskMeta.getLeaderNodeId(),
-                    taskId,
-                    false));
+                new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), 
pipeTaskMeta.getLeaderNodeId()));
           });
     } else {
       // data regions & schema regions
@@ -192,11 +188,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                   // Pipe only collect user's data, filter metric database 
here.
                   updatedConsensusGroupIdToTaskMetaMap.put(
                       regionGroupId.getId(),
-                      new PipeTaskMeta(
-                          currentPipeTaskMeta.getProgressIndex(),
-                          regionLeaderNodeId,
-                          regionGroupId.getId(),
-                          false));
+                      new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), 
regionLeaderNodeId));
                 }
               });
 
@@ -212,9 +204,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
             new PipeTaskMeta(
                 configRegionTaskMeta.getProgressIndex(),
                 // The leader of the config region is the config node itself
-                ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-                configRegionTaskMeta.getProgressIndex().hashCode(),
-                false));
+                
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
       }
     }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index b557f4c7aef..130d964f240 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -272,9 +272,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           groupId.getId(),
           new PipeTaskMeta(
               new RecoverProgressIndex(senderDataNodeId, new 
SimpleProgressIndex(0, 0)),
-              senderDataNodeId,
-              groupId.getId(),
-              false));
+              senderDataNodeId));
     } else if (pipeStaticMeta.isSourceExternal()) {
       // external source
       final PipeExternalSourceLoadBalancer loadBalancer =
@@ -298,9 +296,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           .forEach(
               (taskIndex, leaderNodeId) -> {
                 consensusGroupIdToTaskMetaMap.put(
-                    taskIndex,
-                    new PipeTaskMeta(
-                        MinimumProgressIndex.INSTANCE, leaderNodeId, 
taskIndex, false));
+                    taskIndex, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 
leaderNodeId));
               });
     } else {
       // data regions & schema regions
@@ -317,11 +313,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                   // Pipe only collect user's data, filter out metric database 
here.
                   consensusGroupIdToTaskMetaMap.put(
                       regionGroupId.getId(),
-                      new PipeTaskMeta(
-                          MinimumProgressIndex.INSTANCE,
-                          regionLeaderNodeId,
-                          regionGroupId.getId(),
-                          false));
+                      new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 
regionLeaderNodeId));
                 }
               });
 
@@ -333,9 +325,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           new PipeTaskMeta(
               MinimumProgressIndex.INSTANCE,
               // The leader of the config region is the config node itself
-              ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-              Integer.MIN_VALUE,
-              false));
+              ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
     }
 
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
index bd12adbd804..5c3ec5af063 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
 import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -88,8 +87,6 @@ public class ConfigNodeShutdownHook extends Thread {
             "Reporting ConfigNode shutdown failed. The cluster will still take 
the current ConfigNode as Running for a few seconds.");
       }
     }
-    // Shutdown pipe progressIndex background service
-    PipePeriodicalJobExecutor.shutdownBackgroundService();
 
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info(
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index a4403a88ec8..1505640ff90 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -886,7 +886,7 @@ public class ConfigPhysicalPlanSerDeTest {
     extractorAttributes.put("extractor", 
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
-    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     final PipeStaticMeta pipeStaticMeta =
@@ -911,7 +911,7 @@ public class ConfigPhysicalPlanSerDeTest {
     extractorAttributes.put("pattern", "root.db");
     processorAttributes.put("processor", "do-nothing-processor");
     connectorAttributes.put("batch.enable", "false");
-    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     final PipeStaticMeta pipeStaticMeta =
@@ -949,7 +949,7 @@ public class ConfigPhysicalPlanSerDeTest {
 
   @Test
   public void OperateMultiplePipesPlanV2Test() throws IOException {
-    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     final PipeStaticMeta pipeStaticMeta =
@@ -962,7 +962,7 @@ public class ConfigPhysicalPlanSerDeTest {
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
     final CreatePipePlanV2 createPipePlanV2 = new 
CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
 
-    final PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2, 2, false);
+    final PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
     final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new 
ConcurrentHashMap<>();
     pipeTasks.put(2, pipeTaskMeta1);
     final PipeStaticMeta pipeStaticMeta1 =
@@ -1061,8 +1061,8 @@ public class ConfigPhysicalPlanSerDeTest {
         new PipeRuntimeMeta(
             new ConcurrentHashMap<Integer, PipeTaskMeta>() {
               {
-                put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987, 1, 
false));
-                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789, 
1, false));
+                put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987));
+                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789));
               }
             });
     pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
index 04dea675015..94189a19d99 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
@@ -54,7 +54,7 @@ public class PipeTableRespTest {
     connectorAttributes.put("host", "127.0.0.1");
     connectorAttributes.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -74,7 +74,7 @@ public class PipeTableRespTest {
     connectorAttributes1.put("host", "127.0.0.1");
     connectorAttributes1.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new 
ConcurrentHashMap<>();
     pipeTasks1.put(1, pipeTaskMeta1);
     PipeStaticMeta pipeStaticMeta1 =
@@ -94,7 +94,7 @@ public class PipeTableRespTest {
     connectorAttributes2.put("host", "172.30.30.30");
     connectorAttributes2.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta2 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta2 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks2 = new 
ConcurrentHashMap<>();
     pipeTasks2.put(1, pipeTaskMeta2);
     PipeStaticMeta pipeStaticMeta2 =
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
index a43a87b120e..f2fa5b0205a 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
@@ -62,8 +62,7 @@ public class PipeConfigNodeSubtaskExecutorTest {
                         
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName());
                   }
                 },
-                new PipeTaskMeta(
-                    MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE, 
Integer.MIN_VALUE, false)));
+                new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 
Integer.MIN_VALUE)));
   }
 
   @After
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 815d5c1757e..c3e7916108f 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -86,7 +86,7 @@ public class PipeInfoTest {
     connectorAttributes.put("host", "127.0.0.1");
     connectorAttributes.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -121,7 +121,7 @@ public class PipeInfoTest {
     extractorAttributes.put("extractor", 
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
-    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index adee9d1774b..ab78200f688 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -848,11 +848,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   ///////////////////////// Shutdown Logic /////////////////////////
 
   public void persistAllProgressIndex() {
-    persistAllProgressIndexLocally();
-    persistAllProgressIndex2ConfigNode();
-  }
-
-  private void persistAllProgressIndex2ConfigNode() {
     try (final ConfigNodeClient configNodeClient =
         
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
       // Send request to some API server
@@ -871,28 +866,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
-  private void persistAllProgressIndexLocally() {
-    if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
-      LOGGER.info(
-          "Pipe progress index persist disabled. Skipping persist all progress 
index locally.");
-      return;
-    }
-    if (!tryReadLockWithTimeOut(10)) {
-      LOGGER.info("Failed to persist all progress index locally because of 
timeout.");
-      return;
-    }
-    try {
-      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
-        pipeMeta.getRuntimeMeta().persistProgressIndex();
-      }
-      LOGGER.info("Persist all progress index locally successfully.");
-    } catch (final Exception e) {
-      LOGGER.warn("Failed to record all progress index locally, because {}.", 
e.getMessage(), e);
-    } finally {
-      releaseReadLock();
-    }
-  }
-
   ///////////////////////// Pipe Consensus /////////////////////////
 
   public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index 707c0fef63d..e18c2907f52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -310,12 +310,11 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
     pipeTaskMeta = environment.getPipeTaskMeta();
-
-    // progressIndex is immutable in 
`updateToMinimumEqualOrIsAfterProgressIndex`, so data
-    // consistency in `environment.getPipeTaskMeta().getProgressIndex()` is 
ensured.
-    startIndex = environment.getPipeTaskMeta().restoreProgressIndex();
     if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
-      startIndex = tryToExtractLocalProgressIndexForIoTV2(startIndex);
+      startIndex =
+          
tryToExtractLocalProgressIndexForIoTV2(environment.getPipeTaskMeta().getProgressIndex());
+    } else {
+      startIndex = environment.getPipeTaskMeta().getProgressIndex();
     }
 
     dataRegionId = environment.getRegionId();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 11faf429732..9c31923b7bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -24,7 +24,6 @@ import 
org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -156,8 +155,6 @@ public class DataNodeShutdownHook extends Thread {
     PipeDataNodeAgent.task().persistAllProgressIndex();
     // Shutdown all consensus pipe's receiver
     PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
-    // Shutdown pipe progressIndex background service
-    PipePeriodicalJobExecutor.shutdownBackgroundService();
 
     // Actually stop all services started by the DataNode.
     // If we don't call this, services like the RestService are not stopped 
and I can't re-start
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index cd54414af6d..95a880be3ff 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -149,7 +149,6 @@ public enum ThreadName {
   PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
-  
PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE("Pipe-Progress-Index-Background-Service"),
   LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
   SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
   SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
@@ -271,8 +270,7 @@ public enum ThreadName {
               PIPE_CONSENSUS_RPC_PROCESSOR,
               ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL,
               PIPE_CONSENSUS_DELETION_SERIALIZE,
-              PIPE_CONSENSUS_TSFILE_WRITER_CHECKER,
-              PIPE_CONSENSUS_BACKGROUND_TASK_EXECUTOR));
+              PIPE_CONSENSUS_TSFILE_WRITER_CHECKER));
 
   private static final Set<ThreadName> ratisThreadNames =
       new HashSet<>(
@@ -309,7 +307,6 @@ public enum ThreadName {
               PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
               PIPE_RECEIVER_AIR_GAP_AGENT,
               PIPE_AIR_GAP_RECEIVER,
-              PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE,
               SUBSCRIPTION_EXECUTOR_POOL,
               SUBSCRIPTION_RUNTIME_META_SYNCER,
               WINDOW_EVALUATION_SERVICE,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 42970e1895c..b7bbac6a28c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -195,8 +195,6 @@ public class CommonConfig {
 
   private String pipeHardlinkTsFileDirName = "tsfile";
 
-  private String pipeProgressIndexPersistDirName = "progress";
-
   private String pipeHardlinkWALDirName = "wal";
 
   private boolean pipeHardLinkWALEnabled = false;
@@ -265,9 +263,6 @@ public class CommonConfig {
   private long pipeMetaSyncerSyncIntervalMinutes = 3;
   private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
   private boolean pipeAutoRestartEnabled = true;
-  private boolean pipeProgressIndexPersistEnabled = true;
-  private long pipeProgressIndexPersistCheckPointGap = 20;
-  private long pipeProgressIndexFlushIntervalMs = 20 * 1000L;
 
   private boolean pipeAirGapReceiverEnabled = false;
   private int pipeAirGapReceiverPort = 9780;
@@ -757,18 +752,6 @@ public class CommonConfig {
     logger.info("pipeHardlinkTsFileDirName is set to {}.", pipeTsFileDirName);
   }
 
-  public String getPipeProgressIndexPersistDirName() {
-    return pipeProgressIndexPersistDirName;
-  }
-
-  public void setPipeProgressIndexPersistDirName(String 
pipeProgressIndexPersistDirName) {
-    if (Objects.equals(this.pipeProgressIndexPersistDirName, 
pipeProgressIndexPersistDirName)) {
-      return;
-    }
-    this.pipeProgressIndexPersistDirName = pipeProgressIndexPersistDirName;
-    logger.info("pipeProgressIndexPersistDir is set to {}.", 
pipeProgressIndexPersistDirName);
-  }
-
   public String getPipeHardlinkWALDirName() {
     return pipeHardlinkWALDirName;
   }
@@ -1227,44 +1210,6 @@ public class CommonConfig {
     logger.info("pipeAutoRestartEnabled is set to {}.", 
pipeAutoRestartEnabled);
   }
 
-  public boolean isPipeProgressIndexPersistEnabled() {
-    return pipeProgressIndexPersistEnabled;
-  }
-
-  public void setPipeProgressIndexPersistEnabled(boolean 
pipeProgressIndexPersistEnabled) {
-    if (this.pipeProgressIndexPersistEnabled == 
pipeProgressIndexPersistEnabled) {
-      return;
-    }
-    this.pipeProgressIndexPersistEnabled = pipeProgressIndexPersistEnabled;
-    logger.info("pipeProgressIndexPersistEnabled is set to {}.", 
pipeProgressIndexPersistEnabled);
-  }
-
-  public long getPipeProgressIndexPersistCheckPointGap() {
-    return pipeProgressIndexPersistCheckPointGap;
-  }
-
-  public void setPipeProgressIndexPersistCheckPointGap(long 
pipeProgressIndexPersistCheckPointGap) {
-    if (this.pipeProgressIndexPersistCheckPointGap == 
pipeProgressIndexPersistCheckPointGap) {
-      return;
-    }
-    this.pipeProgressIndexPersistCheckPointGap = 
pipeProgressIndexPersistCheckPointGap;
-    logger.info(
-        "pipeProgressIndexPersistCheckPointGap is set to {}.",
-        pipeProgressIndexPersistCheckPointGap);
-  }
-
-  public long getPipeProgressIndexFlushIntervalMs() {
-    return pipeProgressIndexFlushIntervalMs;
-  }
-
-  public void setPipeProgressIndexFlushIntervalMs(long 
pipeProgressIndexFlushIntervalMs) {
-    if (this.pipeProgressIndexFlushIntervalMs == 
pipeProgressIndexFlushIntervalMs) {
-      return;
-    }
-    this.pipeProgressIndexFlushIntervalMs = pipeProgressIndexFlushIntervalMs;
-    logger.info("pipeProgressIndexFlushIntervalMs is set to {}.", 
pipeProgressIndexFlushIntervalMs);
-  }
-
   public long getPipeConnectorRetryIntervalMs() {
     return pipeConnectorRetryIntervalMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index 33ac03c5c96..3226b3947f0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -21,16 +21,8 @@ package org.apache.iotdb.commons.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The shortest scheduling cycle for these jobs is {@link
  * PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()}, 
suitable for jobs that are
@@ -38,31 +30,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class PipePeriodicalJobExecutor extends 
AbstractPipePeriodicalJobExecutor {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
-  // This background service is used to execute jobs that need to be cancelled 
and released.
-  private static final ScheduledExecutorService backgroundService =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE.getName());
-
-  public static Future<?> submitBackgroundJob(
-      Runnable job, long initialDelayInMs, long periodInMs) {
-    return ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-        backgroundService, job, initialDelayInMs, periodInMs, 
TimeUnit.MILLISECONDS);
-  }
-
-  public static void shutdownBackgroundService() {
-    backgroundService.shutdownNow();
-    try {
-      if (!backgroundService.awaitTermination(30, TimeUnit.SECONDS)) {
-        LOGGER.warn("Pipe progressIndex background service did not terminate 
within {}s", 30);
-      }
-    } catch (InterruptedException e) {
-      LOGGER.warn(
-          "Pipe progressIndex background service is interrupted while waiting 
for termination");
-      Thread.currentThread().interrupt();
-    }
-  }
-
   public PipePeriodicalJobExecutor() {
     super(
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index c61f4dc95fd..cf5bd4f0eb9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -522,7 +522,6 @@ public abstract class PipeTaskAgent {
     // but the pipe task meta has not been cleaned up (in case of failure when 
executing
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
-    existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
 
     // Drop pipe tasks
     final Map<Integer, PipeTask> pipeTasks =
@@ -564,7 +563,6 @@ public abstract class PipeTaskAgent {
     // but the pipe task meta has not been cleaned up (in case of failure when 
executing
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
-    existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
 
     // Drop pipe tasks
     final Map<Integer, PipeTask> pipeTasks =
@@ -657,7 +655,6 @@ public abstract class PipeTaskAgent {
 
     // Set pipe meta status to STOPPED
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
-    existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
   }
 
   ////////////////////////// Checker //////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index c71156a234b..997278010e9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.meta;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
 import org.apache.tsfile.utils.PublicBAOS;
 
 import java.io.DataOutputStream;
@@ -83,9 +81,7 @@ public class PipeMeta {
 
   public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) {
     final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
-    final PipeRuntimeMeta runtimeMeta =
-        PipeRuntimeMeta.deserialize(
-            byteBuffer, 
PipeConfig.getInstance().isPipeProgressIndexPersistEnabled());
+    final PipeRuntimeMeta runtimeMeta = 
PipeRuntimeMeta.deserialize(byteBuffer);
     return new PipeMeta(
         staticMeta,
         runtimeMeta,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index ff77564bd2f..e4beaf20bbf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -110,10 +110,6 @@ public class PipeRuntimeMeta {
     return status;
   }
 
-  public void onSetPipeDroppedOrStopped() {
-    
consensusGroupId2TaskMetaMap.values().forEach(PipeTaskMeta::cancelPersistProgressIndexFuture);
-  }
-
   public ConcurrentMap<Integer, PipeTaskMeta> 
getConsensusGroupId2TaskMetaMap() {
     return consensusGroupId2TaskMetaMap;
   }
@@ -140,15 +136,6 @@ public class PipeRuntimeMeta {
     this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
   }
 
-  public void persistProgressIndex() {
-    // Iterate through all the task metas and persist their progress index
-    for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
-      if (taskMeta.getProgressIndex() != null) {
-        taskMeta.persistProgressIndex();
-      }
-    }
-  }
-
   /**
    * We use negative regionId to identify the external pipe source, which is 
not a consensus group
    * id. Then we can reuse the regionId to schedule the external pipe source 
and store the progress
@@ -217,11 +204,9 @@ public class PipeRuntimeMeta {
 
     final int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
-      final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_1, inputStream, taskIndex, 
false));
+          ReadWriteIOUtils.readInt(inputStream),
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, 
inputStream));
     }
 
     return pipeRuntimeMeta;
@@ -234,11 +219,9 @@ public class PipeRuntimeMeta {
 
     int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
-      final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_2, inputStream, taskIndex, 
false));
+          ReadWriteIOUtils.readInt(inputStream),
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, 
inputStream));
     }
 
     size = ReadWriteIOUtils.readInt(inputStream);
@@ -255,19 +238,14 @@ public class PipeRuntimeMeta {
   }
 
   public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
-    return deserialize(byteBuffer, false);
-  }
-
-  public static PipeRuntimeMeta deserialize(
-      final ByteBuffer byteBuffer, final boolean needPersist) {
     final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(byteBuffer);
     final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
         PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
     switch (pipeRuntimeMetaVersion) {
       case VERSION_1:
-        return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte, 
needPersist);
+        return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte);
       case VERSION_2:
-        return deserializeVersion2(byteBuffer, needPersist);
+        return deserializeVersion2(byteBuffer);
       default:
         throw new UnsupportedOperationException(
             "Unknown pipe runtime meta version: " + 
pipeRuntimeMetaVersion.getVersion());
@@ -275,36 +253,31 @@ public class PipeRuntimeMeta {
   }
 
   private static PipeRuntimeMeta deserializeVersion1(
-      ByteBuffer byteBuffer, byte pipeRuntimeVersionByte, final boolean 
needPersist) {
+      ByteBuffer byteBuffer, byte pipeRuntimeVersionByte) {
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
 
     
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeRuntimeVersionByte));
 
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
-      final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_1, byteBuffer, taskIndex, 
needPersist));
+          ReadWriteIOUtils.readInt(byteBuffer),
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, 
byteBuffer));
     }
 
     return pipeRuntimeMeta;
   }
 
-  public static PipeRuntimeMeta deserializeVersion2(
-      ByteBuffer byteBuffer, final boolean needPersist) {
+  public static PipeRuntimeMeta deserializeVersion2(ByteBuffer byteBuffer) {
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
 
     
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
 
     int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
-      final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_2, byteBuffer, taskIndex, 
needPersist));
+          ReadWriteIOUtils.readInt(byteBuffer),
+          PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, 
byteBuffer));
     }
 
     size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 4a753c0e5bf..627ae1fbf9a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.meta;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
@@ -27,51 +26,26 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeTaskMeta {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskMeta.class);
-  private static final String PREFIX = "__progressIndex_";
-
   private final AtomicReference<ProgressIndex> progressIndex = new 
AtomicReference<>();
   private final AtomicInteger leaderNodeId = new AtomicInteger(0);
 
-  private final AtomicLong updateCount = new AtomicLong(0);
-  private final AtomicLong lastPersistCount = new AtomicLong(0);
-  private final long checkPointGap =
-      PipeConfig.getInstance().getPipeProgressIndexPersistCheckPointGap();
-  private File progressIndexPersistFile;
-  private final AtomicBoolean isRegisterPersistTask = new AtomicBoolean(false);
-  private Future<?> persistProgressIndexFuture;
-
   /**
    * Stores the exceptions encountered during run time of each pipe task.
    *
@@ -84,26 +58,9 @@ public class PipeTaskMeta {
   private final Set<PipeRuntimeException> exceptionMessages =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-  public PipeTaskMeta(
-      /* @NotNull */ final ProgressIndex progressIndex,
-      final int leaderNodeId,
-      final int taskIndex,
-      final boolean needPersistProgressIndex) {
+  public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final 
int leaderNodeId) {
     this.progressIndex.set(progressIndex);
     this.leaderNodeId.set(leaderNodeId);
-    // PipeTaskMeta created in configNode doesn't need to persist progress 
index.
-    if (needPersistProgressIndex) {
-      this.progressIndexPersistFile =
-          new File(
-              IoTDBConstant.DN_DEFAULT_DATA_DIR
-                  + File.separator
-                  + IoTDBConstant.SYSTEM_FOLDER_NAME
-                  + File.separator
-                  + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
-                  + File.separator
-                  + 
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
-              PREFIX + taskIndex);
-    }
   }
 
   public ProgressIndex getProgressIndex() {
@@ -111,86 +68,8 @@ public class PipeTaskMeta {
   }
 
   public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
-    // only pipeTaskMeta that need to updateProgressIndex will persist 
progress index
-    // isRegisterPersistTask is used to avoid multiple threads registering 
persist task concurrently
-    if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
-        && this.persistProgressIndexFuture == null
-        && !isRegisterPersistTask.getAndSet(true)) {
-      this.persistProgressIndexFuture =
-          PipePeriodicalJobExecutor.submitBackgroundJob(
-              this::persistProgressIndex,
-              0,
-              PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
-    }
-
-    progressIndex.updateAndGet(
+    return progressIndex.updateAndGet(
         index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
-
-    if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
-        && updateCount.incrementAndGet() - lastPersistCount.get() > 
checkPointGap) {
-      persistProgressIndex();
-    }
-
-    return progressIndex.get();
-  }
-
-  public synchronized void persistProgressIndex() {
-    if (Objects.isNull(progressIndexPersistFile)
-        // in case of multiple threads calling updateProgressIndex at the same 
time
-        || lastPersistCount.get() == updateCount.get()) {
-      return;
-    }
-
-    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
-        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      progressIndex.get().serialize(outputStream);
-      // append is false by default.
-      FileUtils.writeByteArrayToFile(
-          progressIndexPersistFile,
-          byteArrayOutputStream.getBuf(),
-          0,
-          byteArrayOutputStream.size());
-      lastPersistCount.set(updateCount.get());
-    } catch (IOException e) {
-      LOGGER.warn("Failed to persist progress index {} for {}", 
progressIndex.get(), this, e);
-    }
-  }
-
-  public ProgressIndex restoreProgressIndex() {
-    if (!progressIndexPersistFile.exists() || 
progressIndexPersistFile.length() == 0) {
-      return progressIndex.get();
-    }
-
-    try {
-      final byte[] fileData = 
Files.readAllBytes(progressIndexPersistFile.toPath());
-
-      try (final ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(fileData);
-          final DataInputStream inputStream = new 
DataInputStream(byteArrayInputStream)) {
-        final ProgressIndex restoredIndex = 
ProgressIndexType.deserializeFrom(inputStream);
-        
progressIndex.get().updateToMinimumEqualOrIsAfterProgressIndex(restoredIndex);
-        LOGGER.info(
-            "{} successfully restored progress index from [{}], current index: 
{}",
-            this,
-            progressIndexPersistFile.getAbsolutePath(),
-            progressIndex.get());
-      }
-    } catch (final IOException e) {
-      LOGGER.warn(
-          "{} failed to restore progress index from [{}].",
-          this,
-          progressIndexPersistFile.getAbsolutePath(),
-          e);
-    }
-    return progressIndex.get();
-  }
-
-  public void cancelPersistProgressIndexFuture() {
-    if (Objects.nonNull(progressIndexPersistFile)
-        && isRegisterPersistTask.getAndSet(false)
-        && persistProgressIndexFuture != null) {
-      persistProgressIndexFuture.cancel(false);
-      persistProgressIndexFuture = null;
-    }
   }
 
   public int getLeaderNodeId() {
@@ -242,16 +121,12 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(
-      final PipeRuntimeMetaVersion version,
-      final ByteBuffer byteBuffer,
-      final int taskIndex,
-      final boolean needPersist) {
+      final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
     final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(byteBuffer);
 
     final int leaderNodeId = ReadWriteIOUtils.readInt(byteBuffer);
 
-    final PipeTaskMeta pipeTaskMeta =
-        new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+    final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderNodeId);
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
@@ -262,17 +137,12 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(
-      final PipeRuntimeMetaVersion version,
-      final InputStream inputStream,
-      final int taskIndex,
-      final boolean needPersist)
-      throws IOException {
+      final PipeRuntimeMetaVersion version, final InputStream inputStream) 
throws IOException {
     final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(inputStream);
 
     final int leaderNodeId = ReadWriteIOUtils.readInt(inputStream);
 
-    final PipeTaskMeta pipeTaskMeta =
-        new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+    final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderNodeId);
     final int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index af239c22861..537bb6fbb7c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -46,22 +46,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
   }
 
-  public String getPipeProgressIndexPersistDirName() {
-    return COMMON_CONFIG.getPipeProgressIndexPersistDirName();
-  }
-
-  public boolean isPipeProgressIndexPersistEnabled() {
-    return COMMON_CONFIG.isPipeProgressIndexPersistEnabled();
-  }
-
-  public long getPipeProgressIndexPersistCheckPointGap() {
-    return COMMON_CONFIG.getPipeProgressIndexPersistCheckPointGap();
-  }
-
-  public long getPipeProgressIndexFlushIntervalMs() {
-    return COMMON_CONFIG.getPipeProgressIndexFlushIntervalMs();
-  }
-
   public String getPipeHardlinkWALDirName() {
     return COMMON_CONFIG.getPipeHardlinkWALDirName();
   }
@@ -486,11 +470,6 @@ public class PipeConfig {
 
     LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName());
     LOGGER.info("PipeHardlinkTsFileDirName: {}", 
getPipeHardlinkTsFileDirName());
-    LOGGER.info("PipeProgressIndexPersistDirName: {}", 
getPipeProgressIndexPersistDirName());
-    LOGGER.info("PipeProgressIndexPersistEnabled: {}", 
isPipeProgressIndexPersistEnabled());
-    LOGGER.info(
-        "PipeProgressIndexPersistCheckPointGap: {}", 
getPipeProgressIndexPersistCheckPointGap());
-    LOGGER.info("PipeProgressIndexFlushIntervalMs: {}", 
getPipeProgressIndexFlushIntervalMs());
     LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
     LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
     LOGGER.info("PipeFileReceiverFsyncEnabled: {}", 
getPipeFileReceiverFsyncEnabled());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8bfe59f30bb..5f81ed15270 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -46,9 +46,6 @@ public class PipeDescriptor {
     config.setPipeHardlinkTsFileDirName(
         properties.getProperty(
             "pipe_hardlink_tsfile_dir_name", 
config.getPipeHardlinkTsFileDirName()));
-    config.setPipeProgressIndexPersistDirName(
-        properties.getProperty(
-            "pipe_progress_index_persist_dir_name", 
config.getPipeProgressIndexPersistDirName()));
     config.setPipeHardlinkWALDirName(
         properties.getProperty("pipe_hardlink_wal_dir_name", 
config.getPipeHardlinkWALDirName()));
     config.setPipeHardLinkWALEnabled(
@@ -100,21 +97,6 @@ public class PipeDescriptor {
         Boolean.parseBoolean(
             properties.getProperty(
                 "pipe_auto_restart_enabled", 
String.valueOf(config.getPipeAutoRestartEnabled()))));
-    config.setPipeProgressIndexPersistEnabled(
-        Boolean.parseBoolean(
-            properties.getProperty(
-                "pipe_progress_index_persist_enabled",
-                String.valueOf(config.isPipeProgressIndexPersistEnabled()))));
-    config.setPipeProgressIndexPersistCheckPointGap(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_progress_index_persist_check_point_gap",
-                
String.valueOf(config.getPipeProgressIndexPersistCheckPointGap()))));
-    config.setPipeProgressIndexFlushIntervalMs(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_progress_index_flush_interval_ms",
-                
String.valueOf(config.getPipeProgressIndexFlushIntervalMs()))));
 
     config.setPipeAirGapReceiverEnabled(
         Boolean.parseBoolean(
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
index d4e6008547c..8e04baf7cc0 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
@@ -97,27 +97,20 @@ public class PipeMetaDeSerTest {
         new PipeRuntimeMeta(
             new ConcurrentHashMap<Integer, PipeTaskMeta>() {
               {
-                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987, 
123, false));
-                put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789, 
234, false));
-                put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789, 
345, false));
-                put(456, new PipeTaskMeta(finalHybridProgressIndex, 789, 456, 
false));
+                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987));
+                put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789));
+                put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789));
+                put(456, new PipeTaskMeta(finalHybridProgressIndex, 789));
                 put(
                     567,
                     new PipeTaskMeta(
-                        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 
9)),
-                        123,
-                        567,
-                        false));
+                        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 
9)), 123));
                 put(
                     678,
                     new PipeTaskMeta(
                         new 
TimeWindowStateProgressIndex(timeSeries2TimestampWindowBufferPairMap),
-                        789,
-                        678,
-                        false));
-                put(
-                    Integer.MIN_VALUE,
-                    new PipeTaskMeta(new MetaProgressIndex(987), 0, 
Integer.MIN_VALUE, false));
+                        789));
+                put(Integer.MIN_VALUE, new PipeTaskMeta(new 
MetaProgressIndex(987), 0));
               }
             });
     ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
@@ -136,7 +129,6 @@ public class PipeMetaDeSerTest {
     Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
 
     pipeRuntimeMeta.getStatus().set(PipeStatus.DROPPED);
-    pipeRuntimeMeta.onSetPipeDroppedOrStopped();
     pipeRuntimeMeta.setIsStoppedByRuntimeException(true);
     pipeRuntimeMeta.setExceptionsClearTime(0);
     pipeRuntimeMeta

Reply via email to