This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 656b4fc1a85 Pipe: Fixed the semantics for new regions & realtime / 
history only pipes (#16622)
656b4fc1a85 is described below

commit 656b4fc1a85e929871e2d3382302228c6c9bc703
Author: Caideyipi <[email protected]>
AuthorDate: Thu Oct 23 12:22:49 2025 +0800

    Pipe: Fixed the semantics for new regions & realtime / history only pipes 
(#16622)
    
    * fix
    
    * complexityu
    
    * comp
---
 .../manager/partition/PartitionManager.java        |  8 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 14 +++-
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 21 +++++
 .../impl/pipe/task/AlterPipeProcedureV2.java       | 45 ++++++++--
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 30 +------
 .../task/builder/PipeDataNodeTaskBuilder.java      | 98 +++++++++-------------
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  2 +-
 .../source/dataregion/IoTDBDataRegionSource.java   | 71 ++++++++--------
 ...istoricalDataRegionTsFileAndDeletionSource.java | 25 +-----
 .../subtask/SubscriptionSinkSubtaskManager.java    |  2 +-
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 48 +++++++++++
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 39 +++++++--
 .../pipe/config/constant/SystemConstant.java       |  5 +-
 13 files changed, 234 insertions(+), 174 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 9488b3a8b47..576d805c786 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -1003,14 +1003,10 @@ public class PartitionManager {
   }
 
   public Optional<TConsensusGroupId> generateTConsensusGroupIdByRegionId(final 
int regionId) {
-    if (configManager
-        .getPartitionManager()
-        .isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
+    if (isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
       return Optional.of(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId));
     }
-    if (configManager
-        .getPartitionManager()
-        .isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
+    if (isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
       return Optional.of(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
regionId));
     }
     String msg =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 7b2fd6ad9b4..0b3acb4058e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.confignode.persistence.pipe;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
@@ -621,13 +623,21 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             } else {
                               
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
                             }
-                          } else {
+                          } else if (!PipeTaskAgent.isHistoryOnlyPipe(
+                                  
pipeMeta.getStaticMeta().getSourceParameters())
+                              || !consensusGroupId
+                                  .getType()
+                                  .equals(TConsensusGroupType.DataRegion)) {
                             // If CN does not contain the region group, it 
means the data
                             // region group is newly added.
+                            // We do not handle history only pipes for new 
data regions
+
+                            // Newly added leader
                             if (newLeader != -1) {
                               consensusGroupIdToTaskMetaMap.put(
                                   consensusGroupId.getId(),
-                                  new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
+                                  new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader)
+                                      .markAsNewlyAdded());
                             }
                             // else:
                             // "The pipe task meta does not contain the data 
region group {} or
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 25466d33983..dfdfe00f310 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeProcedureMetrics;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -34,6 +35,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -46,6 +48,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -532,6 +535,24 @@ public abstract class AbstractOperatePipeProcedureV2
             .serialize());
   }
 
+  protected Map<Integer, TPushPipeMetaResp> 
pushSinglePipeMetaToDataNodes4Realtime(
+      String pipeName, ConfigNodeProcedureEnv env) throws IOException {
+    final PipeMeta pipeMeta = 
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName);
+    // Note that although the altered pipe has progress in it,
+    // if we alter it to realtime we should ignore the previous data
+    if (!pipeMeta.getStaticMeta().isSourceExternal()) {
+      pipeMeta
+          .getStaticMeta()
+          .getSourceParameters()
+          .addOrReplaceEquivalentAttributes(
+              new PipeParameters(
+                  Collections.singletonMap(
+                      SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
Boolean.FALSE.toString())));
+    }
+    return env.pushSinglePipeMetaToDataNodes(
+        copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
+  }
+
   /**
    * Drop a pipe on all the dataNodes.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 79e772569cf..f6b84cb1f6e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
@@ -164,11 +166,11 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         new ConcurrentHashMap<>();
     if (currentPipeStaticMeta.isSourceExternal()) {
       currentConsensusGroupId2PipeTaskMeta.forEach(
-          (taskId, pipeTaskMeta) -> {
-            updatedConsensusGroupIdToTaskMetaMap.put(
-                taskId,
-                new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), 
pipeTaskMeta.getLeaderNodeId()));
-          });
+          (taskId, pipeTaskMeta) ->
+              updatedConsensusGroupIdToTaskMetaMap.put(
+                  taskId,
+                  new PipeTaskMeta(
+                      pipeTaskMeta.getProgressIndex(), 
pipeTaskMeta.getLeaderNodeId())));
     } else {
       // data regions & schema regions
       env.getConfigManager()
@@ -185,12 +187,32 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                     && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE 
+ ".")
                     && !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
                     && !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE 
+ ".")
-                    && currentPipeTaskMeta != null
-                    && currentPipeTaskMeta.getLeaderNodeId() == 
regionLeaderNodeId) {
+                    && !(PipeTaskAgent.isHistoryOnlyPipe(
+                            currentPipeStaticMeta.getSourceParameters())
+                        && PipeTaskAgent.isHistoryOnlyPipe(
+                            updatedPipeStaticMeta.getSourceParameters())
+                        && regionGroupId.getType() == 
TConsensusGroupType.DataRegion
+                        && currentPipeTaskMeta.isNewlyAdded())) {
                   // Pipe only collect user's data, filter metric database 
here.
+                  // If it is altered to "pure historical", then the regionIds 
are always new here,
+                  // then it will extract all existing data now, not existing 
data since the
+                  // original pipe was created
+                  // Similar for "pure realtime"
                   updatedConsensusGroupIdToTaskMetaMap.put(
                       regionGroupId.getId(),
-                      new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), 
regionLeaderNodeId));
+                      new PipeTaskMeta(
+                          currentPipeTaskMeta.getProgressIndex(),
+                          
PipeTaskMeta.isNewlyAdded(currentPipeTaskMeta.getLeaderNodeId())
+                                  && !(!PipeTaskAgent.isHistoryOnlyPipe(
+                                          
currentPipeStaticMeta.getSourceParameters())
+                                      && PipeTaskAgent.isHistoryOnlyPipe(
+                                          
updatedPipeStaticMeta.getSourceParameters()))
+                                  && !(!PipeTaskAgent.isRealtimeOnlyPipe(
+                                          
currentPipeStaticMeta.getSourceParameters())
+                                      && PipeTaskAgent.isRealtimeOnlyPipe(
+                                          
updatedPipeStaticMeta.getSourceParameters()))
+                              ? 
PipeTaskMeta.getRevertedLeader(regionLeaderNodeId)
+                              : regionLeaderNodeId));
                 }
               });
 
@@ -248,7 +270,12 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     LOGGER.info("AlterPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     final String exceptionMessage =
-        parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
+        parsePushPipeMetaExceptionForPipe(
+            pipeName,
+            
!PipeTaskAgent.isRealtimeOnlyPipe(currentPipeStaticMeta.getSourceParameters())
+                    && 
PipeTaskAgent.isRealtimeOnlyPipe(updatedPipeStaticMeta.getSourceParameters())
+                ? pushSinglePipeMetaToDataNodes4Realtime(pipeName, env)
+                : pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
       LOGGER.warn(
           "Failed to alter pipe {}, details: {}, metadata will be synchronized 
later.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index c51d88185ba..87e744985f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -109,12 +109,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
@@ -124,8 +118,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
@@ -607,24 +599,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return isHistoryEnable && isRealtimeEnable;
   }
 
-  private boolean isSnapshotMode(final PipeParameters parameters) {
-    final boolean isSnapshotMode;
-    if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY)) {
-      isSnapshotMode =
-          parameters.getBooleanOrDefault(
-              Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY),
-              EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
-    } else {
-      final String sourceModeValue =
-          parameters.getStringOrDefault(
-              Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
-      isSnapshotMode =
-          sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-              || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
-    }
-    return isSnapshotMode;
-  }
-
   @Override
   public void runPipeTasks(
       final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> 
runSingle) {
@@ -804,7 +778,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // If the source is not history, we do not need to allocate memory
     boolean isExtractorHistory =
         sourceParameters.getBooleanOrDefault(
-                SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
             || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -888,7 +862,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // If the source is history enable, we need to transfer tsfile
     boolean needTransferTsFile =
         sourceParameters.getBooleanOrDefault(
-                SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
             || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 5ca08e840fb..038420527fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.task.builder;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
@@ -50,16 +51,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_FORMAT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
 
 public class PipeDataNodeTaskBuilder {
@@ -84,57 +77,57 @@ public class PipeDataNodeTaskBuilder {
   }
 
   public PipeDataNodeTask build() {
-    // Event flow: extractor -> processor -> connector
+    // Event flow: source -> processor -> sink
 
     // Analyzes the PipeParameters to identify potential conflicts.
-    final PipeParameters extractorParameters =
+    final PipeParameters sourceParameters =
         blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters());
-    final PipeParameters connectorParameters =
+    final PipeParameters sinkParameters =
         blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters());
-    checkConflict(extractorParameters, connectorParameters);
-    injectParameters(extractorParameters, connectorParameters);
+    checkConflict(sourceParameters, sinkParameters);
+    injectParameters(sourceParameters, sinkParameters);
 
-    // We first build the extractor and connector, then build the processor.
-    final PipeTaskSourceStage extractorStage =
+    // We first build the source and sink, then build the processor.
+    final PipeTaskSourceStage sourceStage =
         new PipeTaskSourceStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            extractorParameters,
+            sourceParameters,
             regionId,
             pipeTaskMeta);
 
-    final PipeTaskSinkStage connectorStage;
+    final PipeTaskSinkStage sinkStage;
     final PipeType pipeType = pipeStaticMeta.getPipeType();
 
     if (PipeType.SUBSCRIPTION.equals(pipeType)) {
-      connectorStage =
+      sinkStage =
           new SubscriptionTaskSinkStage(
               pipeStaticMeta.getPipeName(),
               pipeStaticMeta.getCreationTime(),
-              connectorParameters,
+              sinkParameters,
               regionId,
               
PipeSubtaskExecutorManager.getInstance().getSubscriptionExecutor());
     } else { // user pipe or consensus pipe
-      connectorStage =
+      sinkStage =
           new PipeTaskSinkStage(
               pipeStaticMeta.getPipeName(),
               pipeStaticMeta.getCreationTime(),
-              connectorParameters,
+              sinkParameters,
               regionId,
               pipeType.equals(PipeType.USER)
                   ? 
PipeSubtaskExecutorManager.getInstance().getConnectorExecutorSupplier()
                   : 
PipeSubtaskExecutorManager.getInstance()::getConsensusExecutor);
     }
 
-    // The processor connects the extractor and connector.
+    // The processor connects the source and sink.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
             
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
             regionId,
-            extractorStage.getEventSupplier(),
-            connectorStage.getPipeConnectorPendingQueue(),
+            sourceStage.getEventSupplier(),
+            sinkStage.getPipeConnectorPendingQueue(),
             PROCESSOR_EXECUTOR,
             pipeTaskMeta,
             pipeStaticMeta
@@ -146,12 +139,13 @@ public class PipeDataNodeTaskBuilder {
             PipeType.SUBSCRIPTION.equals(pipeType));
 
     return new PipeDataNodeTask(
-        pipeStaticMeta.getPipeName(), regionId, extractorStage, 
processorStage, connectorStage);
+        pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, 
sinkStage);
   }
 
   private void generateSystemParameters() {
-    if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)) {
-      systemParameters.put(SystemConstant.RESTART_KEY, 
Boolean.TRUE.toString());
+    if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
+        || pipeTaskMeta.isNewlyAdded()) {
+      systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
Boolean.TRUE.toString());
     }
   }
 
@@ -164,29 +158,15 @@ public class PipeDataNodeTaskBuilder {
   }
 
   private void checkConflict(
-      final PipeParameters extractorParameters, final PipeParameters 
connectorParameters) {
+      final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
     final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
 
     try {
       insertionDeletionListeningOptionPair =
-          
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters);
-
-      if (extractorParameters.hasAnyAttributes(
-          EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) {
-        shouldTerminatePipeOnAllHistoricalEventsConsumed =
-            extractorParameters.getBooleanOrDefault(
-                Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY),
-                EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
-      } else {
-        final String extractorModeValue =
-            extractorParameters.getStringOrDefault(
-                Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
-        shouldTerminatePipeOnAllHistoricalEventsConsumed =
-            extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-                || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
-      }
-
+          
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters);
+      shouldTerminatePipeOnAllHistoricalEventsConsumed =
+          PipeTaskAgent.isSnapshotMode(sourceParameters);
     } catch (final IllegalPathException e) {
       LOGGER.warn(
           "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' 
parameters: {}",
@@ -198,17 +178,17 @@ public class PipeDataNodeTaskBuilder {
     if (insertionDeletionListeningOptionPair.right
         || shouldTerminatePipeOnAllHistoricalEventsConsumed) {
       final Boolean isRealtime =
-          connectorParameters.getBooleanByKeys(
+          sinkParameters.getBooleanByKeys(
               PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
               PipeSinkConstant.SINK_REALTIME_FIRST_KEY);
       if (isRealtime == null) {
-        
connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
"false");
+        
sinkParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
"false");
         if (insertionDeletionListeningOptionPair.right) {
           LOGGER.info(
               "PipeDataNodeTaskBuilder: When 'inclusion' contains 
'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues 
after deletion.");
         } else {
           LOGGER.info(
-              "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' is defaulted to 'false' to prevent premature halt before 
transfer completion.");
+              "PipeDataNodeTaskBuilder: When source uses snapshot model, 
'realtime-first' is defaulted to 'false' to prevent premature halt before 
transfer completion.");
         }
       } else if (isRealtime) {
         if (insertionDeletionListeningOptionPair.right) {
@@ -216,24 +196,24 @@ public class PipeDataNodeTaskBuilder {
               "PipeDataNodeTaskBuilder: When 'inclusion' includes 
'data.delete', 'realtime-first' set to 'true' may result in data 
synchronization issues after deletion.");
         } else {
           LOGGER.warn(
-              "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' set to 'true' may cause prevent premature halt before transfer 
completion.");
+              "PipeDataNodeTaskBuilder: When source uses snapshot model, 
'realtime-first' set to 'true' may cause prevent premature halt before transfer 
completion.");
         }
       }
     }
 
     final boolean isRealtimeEnabled =
-        extractorParameters.getBooleanOrDefault(
+        sourceParameters.getBooleanOrDefault(
             Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
             EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
 
     if (isRealtimeEnabled && 
!shouldTerminatePipeOnAllHistoricalEventsConsumed) {
       final Boolean enableSendTsFileLimit =
-          connectorParameters.getBooleanByKeys(
+          sinkParameters.getBooleanByKeys(
               PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
               PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
 
       if (enableSendTsFileLimit == null) {
-        
connectorParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
 "true");
+        
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, 
"true");
         LOGGER.info(
             "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we 
enable rate limiter in sending tsfile by default to reserve disk and network IO 
for realtime sending.");
       } else if (!enableSendTsFileLimit) {
@@ -244,27 +224,27 @@ public class PipeDataNodeTaskBuilder {
   }
 
   private void injectParameters(
-      final PipeParameters extractorParameters, final PipeParameters 
connectorParameters) {
+      final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
     final boolean isSourceExternal =
         !BuiltinPipePlugin.BUILTIN_SOURCES.contains(
-            extractorParameters
+            sourceParameters
                 .getStringOrDefault(
                     Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
                     BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
                 .toLowerCase());
 
-    final String connectorPluginName =
-        connectorParameters
+    final String sinkPluginName =
+        sinkParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
                 BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
             .toLowerCase();
     final boolean isWriteBackSink =
-        
BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName().equals(connectorPluginName)
-            || 
BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName().equals(connectorPluginName);
+        
BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName().equals(sinkPluginName)
+            || 
BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName().equals(sinkPluginName);
 
     if (isSourceExternal && isWriteBackSink) {
-      connectorParameters.addAttribute(
+      sinkParameters.addAttribute(
           PipeSinkConstant.CONNECTOR_USE_EVENT_USER_NAME_KEY, 
Boolean.TRUE.toString());
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index a5e217d8919..b249ed7b1e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -260,7 +260,7 @@ public class PipeSinkSubtaskManager {
   private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
     final TreeMap<String, String> sortedStringSourceMap =
         new TreeMap<>(pipeConnectorParameters.getAttribute());
-    sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+    sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
     return sortedStringSourceMap.toString();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 9d2b629a67d..6931726157e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.source.dataregion;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
@@ -66,12 +67,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STREAMING_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY;
@@ -222,7 +219,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
           "The pipe cannot transfer data when data region is using ratis 
consensus.");
     }
 
-    // Validate extractor.pattern.format is within valid range
+    // Validate source.pattern.format is within valid range
     validator
         .validateAttributeValueRange(
             EXTRACTOR_PATTERN_FORMAT_KEY,
@@ -238,7 +235,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     // Validate tree pattern and table pattern
     
validatePattern(TreePattern.parsePipePatternFromSourceParameters(validator.getParameters()));
 
-    // Validate extractor.history.enable and extractor.realtime.enable
+    // Validate source.history.enable and source.realtime.enable
     validator
         .validateAttributeValueRange(
             EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
@@ -262,7 +259,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
                     Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
                     EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
 
-    // Validate extractor.realtime.mode
+    // Validate source.realtime.mode
     if (validator
             .getParameters()
             .getBooleanOrDefault(
@@ -388,17 +385,17 @@ public class IoTDBDataRegionSource extends IoTDBSource {
         Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, 
_SOURCE_WATERMARK_INTERVAL_KEY),
         false);
 
-    // Check if specifying mode.snapshot or mode.streaming when disable 
realtime extractor
+    // Check if specifying mode.snapshot or mode.streaming when disable 
realtime source
     if (!parameters.getBooleanOrDefault(
         Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
         EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
-      if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY)) {
+      if (parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
         LOGGER.warn(
             "When '{}' ('{}') is set to false, specifying {} and {} is 
invalid.",
             EXTRACTOR_REALTIME_ENABLE_KEY,
             SOURCE_REALTIME_ENABLE_KEY,
-            EXTRACTOR_MODE_SNAPSHOT_KEY,
-            SOURCE_MODE_SNAPSHOT_KEY);
+            EXTRACTOR_REALTIME_MODE_KEY,
+            SOURCE_REALTIME_MODE_KEY);
       }
       if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, 
SOURCE_MODE_STREAMING_KEY)) {
         LOGGER.warn(
@@ -408,6 +405,21 @@ public class IoTDBDataRegionSource extends IoTDBSource {
             EXTRACTOR_MODE_STREAMING_KEY,
             SOURCE_MODE_STREAMING_KEY);
       }
+    } else {
+      if (parameters.hasAnyAttributes(
+          EXTRACTOR_MODE_SNAPSHOT_KEY,
+          SOURCE_MODE_SNAPSHOT_KEY,
+          EXTRACTOR_MODE_KEY,
+          SOURCE_MODE_KEY)) {
+        LOGGER.warn(
+            "When '{}' ('{}', '{}', '{}') is set to true, specifying {} and {} 
is invalid.",
+            EXTRACTOR_MODE_SNAPSHOT_KEY,
+            SOURCE_MODE_SNAPSHOT_KEY,
+            EXTRACTOR_MODE_KEY,
+            SOURCE_MODE_KEY,
+            EXTRACTOR_REALTIME_ENABLE_KEY,
+            SOURCE_REALTIME_ENABLE_KEY);
+      }
     }
   }
 
@@ -416,37 +428,22 @@ public class IoTDBDataRegionSource extends IoTDBSource {
   }
 
   private void constructRealtimeExtractor(final PipeParameters parameters) {
-    // Use heartbeat only extractor if disable realtime extractor
+    // Use heartbeat only source if disable realtime source
     if (!parameters.getBooleanOrDefault(
         Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
         EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
       LOGGER.info(
-          "Pipe: '{}' ('{}') is set to false, use heartbeat realtime 
extractor.",
+          "Pipe: '{}' ('{}') is set to false, use heartbeat realtime source.",
           EXTRACTOR_REALTIME_ENABLE_KEY,
           SOURCE_REALTIME_ENABLE_KEY);
       return;
     }
 
-    final boolean isSnapshotMode;
-    if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY)) {
-      isSnapshotMode =
-          parameters.getBooleanOrDefault(
-              Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY),
-              EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
-    } else {
-      final String extractorModeValue =
-          parameters.getStringOrDefault(
-              Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
-      isSnapshotMode =
-          extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-              || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
-    }
-
-    // Use heartbeat only extractor if enable snapshot mode
-    if (isSnapshotMode) {
+    // Use heartbeat only source if enable snapshot mode
+    if (PipeTaskAgent.isSnapshotMode(parameters)) {
       realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
-      LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime 
extractor.");
+      LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime 
source.");
       return;
     }
 
@@ -493,7 +490,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
         realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
-              "Pipe: Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
+              "Pipe: Unsupported source realtime mode: {}, create a hybrid 
source.",
               parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
         }
     }
@@ -550,7 +547,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
     final long startTime = System.currentTimeMillis();
     LOGGER.info(
-        "Pipe {}@{}: Starting historical extractor {} and realtime extractor 
{}.",
+        "Pipe {}@{}: Starting historical source {} and realtime source {}.",
         pipeName,
         regionId,
         historicalExtractor.getClass().getSimpleName(),
@@ -561,7 +558,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     final AtomicReference<Exception> exceptionHolder = new 
AtomicReference<>(null);
     final DataRegionId dataRegionIdObject = new DataRegionId(this.regionId);
     while (true) {
-      // try to start extractors in the data region ...
+      // try to start sources in the data region ...
       // first try to run if data region exists, then try to run if data 
region does not exist.
       // both conditions fail is not common, which means the data region is 
created during the
       // runIfPresent and runIfAbsent operations. in this case, we need to 
retry.
@@ -584,7 +581,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
         rethrowExceptionIfAny(exceptionHolder);
 
         LOGGER.info(
-            "Pipe {}@{}: Started historical extractor {} and realtime 
extractor {} successfully within {} ms.",
+            "Pipe {}@{}: Started historical source {} and realtime source {} 
successfully within {} ms.",
             pipeName,
             regionId,
             historicalExtractor.getClass().getSimpleName(),
@@ -609,7 +606,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     } catch (final Exception e) {
       exceptionHolder.set(e);
       LOGGER.warn(
-          "Pipe {}@{}: Start historical extractor {} and realtime extractor {} 
error.",
+          "Pipe {}@{}: Start historical source {} and realtime source {} 
error.",
           pipeName,
           regionId,
           historicalExtractor.getClass().getSimpleName(),
@@ -620,7 +617,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
   private void rethrowExceptionIfAny(final AtomicReference<Exception> 
exceptionHolder) {
     if (exceptionHolder.get() != null) {
-      throw new PipeException("failed to start extractors.", 
exceptionHolder.get());
+      throw new PipeException("failed to start sources.", 
exceptionHolder.get());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 09fcabaa62b..c5d1734a630 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -91,12 +92,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_DEFAULT_VALUE;
@@ -109,8 +104,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY;
@@ -263,7 +256,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
     isHistoricalSourceEnabled =
         parameters.getBooleanOrDefault(
-                SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
             || parameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -359,19 +352,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
                   listeningOptionPair.getRight());
     }
 
-    if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY)) {
-      shouldTerminatePipeOnAllHistoricalEventsConsumed =
-          parameters.getBooleanOrDefault(
-              Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY),
-              EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
-    } else {
-      final String extractorModeValue =
-          parameters.getStringOrDefault(
-              Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
-      shouldTerminatePipeOnAllHistoricalEventsConsumed =
-          extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-              || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
-    }
+    shouldTerminatePipeOnAllHistoricalEventsConsumed = 
PipeTaskAgent.isSnapshotMode(parameters);
 
     userId =
         parameters.getStringOrDefault(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 6fb2809c053..edff8257c65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -210,7 +210,7 @@ public class SubscriptionSinkSubtaskManager {
   private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
     final TreeMap<String, String> sortedStringSourceMap =
         new TreeMap<>(pipeConnectorParameters.getAttribute());
-    sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+    sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
     return sortedStringSourceMap.toString();
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index dffb0c47de1..73b543592c1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -49,6 +49,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,6 +66,21 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
+
 /**
  * State transition diagram of a pipe task:
  *
@@ -1108,6 +1124,38 @@ public abstract class PipeTaskAgent {
   public abstract void runPipeTasks(
       final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> 
runSingle);
 
+  public static boolean isHistoryOnlyPipe(final PipeParameters parameters) {
+    return isSnapshotMode(parameters)
+        || !parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+            EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+  }
+
+  public static boolean isRealtimeOnlyPipe(final PipeParameters parameters) {
+    return !isSnapshotMode(parameters)
+        && !parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+  }
+
+  public static boolean isSnapshotMode(final PipeParameters parameters) {
+    final boolean isSnapshotMode;
+    if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY)) {
+      isSnapshotMode =
+          parameters.getBooleanOrDefault(
+              Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY),
+              EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
+    } else {
+      final String sourceModeValue =
+          parameters.getStringOrDefault(
+              Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+      isSnapshotMode =
+          sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+              || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+    }
+    return isSnapshotMode;
+  }
+
   ///////////////////////// Maintain meta info /////////////////////////
 
   public long getPipeCreationTime(final String pipeName) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index bf9a1869246..9584ca8cbab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -44,6 +44,8 @@ import java.util.concurrent.atomic.AtomicReference;
 public class PipeTaskMeta {
 
   private final AtomicReference<ProgressIndex> progressIndex = new 
AtomicReference<>();
+
+  // This is -2 - leaderNodeId iff it's a newly added region with internal 
source.
   private final AtomicInteger leaderNodeId = new AtomicInteger(0);
 
   /**
@@ -63,21 +65,44 @@ public class PipeTaskMeta {
     this.leaderNodeId.set(leaderNodeId);
   }
 
-  public ProgressIndex getProgressIndex() {
-    return progressIndex.get();
+  ///////////////////////// Region old & new test /////////////////////////
+
+  public PipeTaskMeta markAsNewlyAdded() {
+    leaderNodeId.getAndUpdate(PipeTaskMeta::getRevertedLeader);
+    return this;
   }
 
-  public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
-    return progressIndex.updateAndGet(
-        index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
+  public boolean isNewlyAdded() {
+    return isNewlyAdded(leaderNodeId.get());
   }
 
   public int getLeaderNodeId() {
-    return leaderNodeId.get();
+    final int result = leaderNodeId.get();
+    return isNewlyAdded(result) ? getRevertedLeader(result) : result;
   }
 
   public void setLeaderNodeId(final int leaderNodeId) {
-    this.leaderNodeId.set(leaderNodeId);
+    this.leaderNodeId.updateAndGet(
+        leaderId -> isNewlyAdded(leaderId) ? getRevertedLeader(leaderNodeId) : 
leaderNodeId);
+  }
+
+  public static int getRevertedLeader(final int leaderNodeId) {
+    return -2 - leaderNodeId;
+  }
+
+  public static boolean isNewlyAdded(final int leaderNodeId) {
+    return leaderNodeId < -1;
+  }
+
+  ///////////////////////// Normal /////////////////////////
+
+  public ProgressIndex getProgressIndex() {
+    return progressIndex.get();
+  }
+
+  public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
+    return progressIndex.updateAndGet(
+        index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
   }
 
   public synchronized Iterable<PipeRuntimeException> getExceptionMessages() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index b2383bfdcc8..de0d6050167 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -34,7 +34,8 @@ public class SystemConstant {
   public static final String AUDIT_DATABASE = "root.__audit";
   public static final String AUDIT_PREFIX_KEY = "__audit";
 
-  public static final String RESTART_KEY = "__system.restart";
+  // This can be arbitrarily changed since it's only a memory key and not 
stored
+  public static final String RESTART_OR_NEWLY_ADDED_KEY = 
"__system.restart_or_newly_added";
   public static final boolean RESTART_DEFAULT_VALUE = false;
 
   public static final String SQL_DIALECT_KEY = "__system.sql-dialect";
@@ -46,7 +47,7 @@ public class SystemConstant {
   public static final Set<String> SYSTEM_KEYS = new HashSet<>();
 
   static {
-    SYSTEM_KEYS.add(RESTART_KEY);
+    SYSTEM_KEYS.add(RESTART_OR_NEWLY_ADDED_KEY);
     SYSTEM_KEYS.add(SQL_DIALECT_KEY);
   }
 

Reply via email to