This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 81ec29cf409 [To dev/1.3] Pipe: Fixed the semantics for new regions & 
realtime / history only pipes (#16641)
81ec29cf409 is described below

commit 81ec29cf409099315663621649f323d55ee184a7
Author: Caideyipi <[email protected]>
AuthorDate: Thu Oct 23 20:41:23 2025 +0800

    [To dev/1.3] Pipe: Fixed the semantics for new regions & realtime / history 
only pipes (#16641)
---
 .../manager/partition/PartitionManager.java        |  8 +--
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 14 ++++-
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 19 ++++++
 .../impl/pipe/task/AlterPipeProcedureV2.java       | 39 ++++++++++--
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 12 +---
 .../task/builder/PipeDataNodeTaskBuilder.java      | 69 ++++++++++------------
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  2 +-
 .../source/dataregion/IoTDBDataRegionSource.java   | 44 ++++++--------
 .../PipeHistoricalDataRegionTsFileSource.java      | 15 +----
 .../subtask/SubscriptionSinkSubtaskManager.java    |  2 +-
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 38 ++++++++++++
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 39 +++++++++---
 .../pipe/config/constant/SystemConstant.java       |  3 +-
 13 files changed, 194 insertions(+), 110 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 632a5b59419..e186be44354 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -955,14 +955,10 @@ public class PartitionManager {
   }
 
   public Optional<TConsensusGroupId> generateTConsensusGroupIdByRegionId(final 
int regionId) {
-    if (configManager
-        .getPartitionManager()
-        .isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
+    if (isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
       return Optional.of(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId));
     }
-    if (configManager
-        .getPartitionManager()
-        .isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
+    if (isRegionGroupExists(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
       return Optional.of(new TConsensusGroupId(TConsensusGroupType.DataRegion, 
regionId));
     }
     String msg =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 3df50fbf3b7..08f96a25a61 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.confignode.persistence.pipe;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
@@ -610,13 +612,21 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             } else {
                               
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
                             }
-                          } else {
+                          } else if (!PipeTaskAgent.isHistoryOnlyPipe(
+                                  
pipeMeta.getStaticMeta().getExtractorParameters())
+                              || !consensusGroupId
+                                  .getType()
+                                  .equals(TConsensusGroupType.DataRegion)) {
                             // If CN does not contain the region group, it 
means the data
                             // region group is newly added.
+                            // We do not handle history only pipes for new 
data regions
+
+                            // Newly added leader
                             if (newLeader != -1) {
                               consensusGroupIdToTaskMetaMap.put(
                                   consensusGroupId.getId(),
-                                  new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
+                                  new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader)
+                                      .markAsNewlyAdded());
                             }
                             // else:
                             // "The pipe task meta does not contain the data 
region group {} or
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index b860d7fcb82..857d5733f6a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeProcedureMetrics;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -29,6 +30,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProce
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -41,6 +43,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -526,6 +529,22 @@ public abstract class AbstractOperatePipeProcedureV2
         pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
   }
 
+  protected Map<Integer, TPushPipeMetaResp> 
pushSinglePipeMetaToDataNodes4Realtime(
+      String pipeName, ConfigNodeProcedureEnv env) throws IOException {
+    final PipeMeta pipeMeta = 
pipeTaskInfo.get().getPipeMetaByPipeName(pipeName);
+    // Note that although the altered pipe has progress in it,
+    // if we alter it to realtime we should ignore the previous data
+    pipeMeta
+        .getStaticMeta()
+        .getExtractorParameters()
+        .addOrReplaceEquivalentAttributes(
+            new PipeParameters(
+                Collections.singletonMap(
+                    SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
Boolean.FALSE.toString())));
+    return env.pushSinglePipeMetaToDataNodes(
+        pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
+  }
+
   /**
    * Drop a pipe on all the dataNodes.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index b11c74408a6..5a7b9b8a5b2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
@@ -137,6 +139,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
     final ConcurrentMap<Integer, PipeTaskMeta> 
updatedConsensusGroupIdToTaskMetaMap =
         new ConcurrentHashMap<>();
+
     // data regions & schema regions
     env.getConfigManager()
         .getLoadManager()
@@ -151,11 +154,33 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                   && !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
                   && !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + 
".")
                   && currentPipeTaskMeta != null
-                  && currentPipeTaskMeta.getLeaderNodeId() == 
regionLeaderNodeId) {
+                  && currentPipeTaskMeta.getLeaderNodeId() == 
regionLeaderNodeId
+                  && !(PipeTaskAgent.isHistoryOnlyPipe(
+                          currentPipeStaticMeta.getExtractorParameters())
+                      && PipeTaskAgent.isHistoryOnlyPipe(
+                          updatedPipeStaticMeta.getExtractorParameters())
+                      && regionGroupId.getType() == 
TConsensusGroupType.DataRegion
+                      && currentPipeTaskMeta.isNewlyAdded())) {
                 // Pipe only collect user's data, filter metric database here.
+                // If it is altered to "pure historical", then the regionIds 
are always new here,
+                // then it will extract all existing data now, not existing 
data since the
+                // original pipe was created
+                // Similar for "pure realtime"
                 updatedConsensusGroupIdToTaskMetaMap.put(
                     regionGroupId.getId(),
-                    new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), 
regionLeaderNodeId));
+                    new PipeTaskMeta(
+                        currentPipeTaskMeta.getProgressIndex(),
+                        
PipeTaskMeta.isNewlyAdded(currentPipeTaskMeta.getLeaderNodeId())
+                                && !(!PipeTaskAgent.isHistoryOnlyPipe(
+                                        
currentPipeStaticMeta.getExtractorParameters())
+                                    && PipeTaskAgent.isHistoryOnlyPipe(
+                                        
updatedPipeStaticMeta.getExtractorParameters()))
+                                && !(!PipeTaskAgent.isRealtimeOnlyPipe(
+                                        
currentPipeStaticMeta.getExtractorParameters())
+                                    && PipeTaskAgent.isRealtimeOnlyPipe(
+                                        
updatedPipeStaticMeta.getExtractorParameters()))
+                            ? 
PipeTaskMeta.getRevertedLeader(regionLeaderNodeId)
+                            : regionLeaderNodeId));
               }
             });
 
@@ -209,8 +234,14 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     final String pipeName = alterPipeRequest.getPipeName();
     LOGGER.info("AlterPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
-    String exceptionMessage =
-        parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
+    final String exceptionMessage =
+        parsePushPipeMetaExceptionForPipe(
+            pipeName,
+            
!PipeTaskAgent.isRealtimeOnlyPipe(currentPipeStaticMeta.getExtractorParameters())
+                    && PipeTaskAgent.isRealtimeOnlyPipe(
+                        updatedPipeStaticMeta.getExtractorParameters())
+                ? pushSinglePipeMetaToDataNodes4Realtime(pipeName, env)
+                : pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
       LOGGER.warn(
           "Failed to alter pipe {}, details: {}, metadata will be synchronized 
later.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f916ac067c8..7070d57d3b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -603,14 +603,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return isHistoryEnable && isRealtimeEnable;
   }
 
-  private boolean isSnapshotMode(final PipeParameters parameters) {
-    final String sourceModeValue =
-        parameters.getStringOrDefault(
-            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
-    return sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-        || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
-  }
-
   @Override
   public void runPipeTasks(
       final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> 
runSingle) {
@@ -789,7 +781,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // If the source is not history, we do not need to allocate memory
     boolean isExtractorHistory =
         sourceParameters.getBooleanOrDefault(
-                SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
             || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -873,7 +865,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // If the source is history enable, we need to transfer tsfile
     boolean needTransferTsFile =
         sourceParameters.getBooleanOrDefault(
-                SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
             || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 21f344f173d..2edc5b943f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.builder;
 
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
@@ -48,13 +49,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_FORMAT_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
 
 public class PipeDataNodeTaskBuilder {
@@ -79,56 +75,56 @@ public class PipeDataNodeTaskBuilder {
   }
 
   public PipeDataNodeTask build() {
-    // Event flow: extractor -> processor -> connector
+    // Event flow: source -> processor -> sink
 
     // Analyzes the PipeParameters to identify potential conflicts.
-    final PipeParameters extractorParameters =
+    final PipeParameters sourceParameters =
         blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters());
-    final PipeParameters connectorParameters =
+    final PipeParameters sinkParameters =
         blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters());
-    checkConflict(extractorParameters, connectorParameters);
+    checkConflict(sourceParameters, sinkParameters);
 
-    // We first build the extractor and connector, then build the processor.
-    final PipeTaskSourceStage extractorStage =
+    // We first build the source and sink, then build the processor.
+    final PipeTaskSourceStage sourceStage =
         new PipeTaskSourceStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            extractorParameters,
+            sourceParameters,
             regionId,
             pipeTaskMeta);
 
-    final PipeTaskSinkStage connectorStage;
+    final PipeTaskSinkStage sinkStage;
     final PipeType pipeType = pipeStaticMeta.getPipeType();
 
     if (PipeType.SUBSCRIPTION.equals(pipeType)) {
-      connectorStage =
+      sinkStage =
           new SubscriptionTaskSinkStage(
               pipeStaticMeta.getPipeName(),
               pipeStaticMeta.getCreationTime(),
-              connectorParameters,
+              sinkParameters,
               regionId,
               
PipeSubtaskExecutorManager.getInstance().getSubscriptionExecutor());
     } else { // user pipe or consensus pipe
-      connectorStage =
+      sinkStage =
           new PipeTaskSinkStage(
               pipeStaticMeta.getPipeName(),
               pipeStaticMeta.getCreationTime(),
-              connectorParameters,
+              sinkParameters,
               regionId,
               pipeType.equals(PipeType.USER)
                   ? 
PipeSubtaskExecutorManager.getInstance().getConnectorExecutorSupplier()
                   : 
PipeSubtaskExecutorManager.getInstance().getConsensusExecutorSupplier());
     }
 
-    // The processor connects the extractor and connector.
+    // The processor connects the source and sink.
     final PipeTaskProcessorStage processorStage =
         new PipeTaskProcessorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
             
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
             regionId,
-            extractorStage.getEventSupplier(),
-            connectorStage.getPipeConnectorPendingQueue(),
+            sourceStage.getEventSupplier(),
+            sinkStage.getPipeConnectorPendingQueue(),
             PROCESSOR_EXECUTOR,
             pipeTaskMeta,
             pipeStaticMeta
@@ -140,12 +136,13 @@ public class PipeDataNodeTaskBuilder {
             PipeType.SUBSCRIPTION.equals(pipeType));
 
     return new PipeDataNodeTask(
-        pipeStaticMeta.getPipeName(), regionId, extractorStage, 
processorStage, connectorStage);
+        pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, 
sinkStage);
   }
 
   private void generateSystemParameters() {
-    if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)) {
-      systemParameters.put(SystemConstant.RESTART_KEY, 
Boolean.TRUE.toString());
+    if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
+        || pipeTaskMeta.isNewlyAdded()) {
+      systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
Boolean.TRUE.toString());
     }
   }
 
@@ -158,21 +155,15 @@ public class PipeDataNodeTaskBuilder {
   }
 
   private void checkConflict(
-      final PipeParameters extractorParameters, final PipeParameters 
connectorParameters) {
+      final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
     final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
 
     try {
       insertionDeletionListeningOptionPair =
-          
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters);
-
-      final String extractorModeValue =
-          extractorParameters.getStringOrDefault(
-              Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+          
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters);
       shouldTerminatePipeOnAllHistoricalEventsConsumed =
-          extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-              || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
-
+          PipeTaskAgent.isSnapshotMode(sourceParameters);
     } catch (final IllegalPathException e) {
       LOGGER.warn(
           "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' 
parameters: {}",
@@ -184,17 +175,17 @@ public class PipeDataNodeTaskBuilder {
     if (insertionDeletionListeningOptionPair.right
         || shouldTerminatePipeOnAllHistoricalEventsConsumed) {
       final Boolean isRealtime =
-          connectorParameters.getBooleanByKeys(
+          sinkParameters.getBooleanByKeys(
               PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
               PipeSinkConstant.SINK_REALTIME_FIRST_KEY);
       if (isRealtime == null) {
-        
connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
"false");
+        
sinkParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
"false");
         if (insertionDeletionListeningOptionPair.right) {
           LOGGER.info(
               "PipeDataNodeTaskBuilder: When 'inclusion' contains 
'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues 
after deletion.");
         } else {
           LOGGER.info(
-              "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' is defaulted to 'false' to prevent premature halt before 
transfer completion.");
+              "PipeDataNodeTaskBuilder: When source uses snapshot model, 
'realtime-first' is defaulted to 'false' to prevent premature halt before 
transfer completion.");
         }
       } else if (isRealtime) {
         if (insertionDeletionListeningOptionPair.right) {
@@ -202,24 +193,24 @@ public class PipeDataNodeTaskBuilder {
               "PipeDataNodeTaskBuilder: When 'inclusion' includes 
'data.delete', 'realtime-first' set to 'true' may result in data 
synchronization issues after deletion.");
         } else {
           LOGGER.warn(
-              "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' set to 'true' may cause prevent premature halt before transfer 
completion.");
+              "PipeDataNodeTaskBuilder: When source uses snapshot model, 
'realtime-first' set to 'true' may cause prevent premature halt before transfer 
completion.");
         }
       }
     }
 
     final boolean isRealtimeEnabled =
-        extractorParameters.getBooleanOrDefault(
+        sourceParameters.getBooleanOrDefault(
             Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
             EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
 
     if (isRealtimeEnabled && 
!shouldTerminatePipeOnAllHistoricalEventsConsumed) {
       final Boolean enableSendTsFileLimit =
-          connectorParameters.getBooleanByKeys(
+          sinkParameters.getBooleanByKeys(
               PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
               PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
 
       if (enableSendTsFileLimit == null) {
-        
connectorParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
 "true");
+        
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, 
"true");
         LOGGER.info(
             "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we 
enable rate limiter in sending tsfile by default to reserve disk and network IO 
for realtime sending.");
       } else if (!enableSendTsFileLimit) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index e2743147705..97859938828 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -257,7 +257,7 @@ public class PipeSinkSubtaskManager {
   private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
     final TreeMap<String, String> sortedStringSourceMap =
         new TreeMap<>(pipeConnectorParameters.getAttribute());
-    sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+    sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
     return sortedStringSourceMap.toString();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index e7ff984358a..ec99ed2abaf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.source.dataregion;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import 
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.source.IoTDBSource;
@@ -58,10 +59,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
@@ -81,7 +78,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_MODE_KEY;
@@ -123,7 +119,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
           "The pipe cannot transfer data when data region is using ratis 
consensus.");
     }
 
-    // Validate extractor.pattern.format is within valid range
+    // Validate source.pattern.format is within valid range
     validator
         .validateAttributeValueRange(
             EXTRACTOR_PATTERN_FORMAT_KEY,
@@ -143,7 +139,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     // Check whether the pattern is legal
     validatePattern(pattern);
 
-    // Validate extractor.history.enable and extractor.realtime.enable
+    // Validate source.history.enable and source.realtime.enable
     validator
         .validateAttributeValueRange(
             EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), 
Boolean.FALSE.toString())
@@ -167,7 +163,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
                     Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
                     EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
 
-    // Validate extractor.realtime.mode
+    // Validate source.realtime.mode
     if (validator
             .getParameters()
             .getBooleanOrDefault(
@@ -245,28 +241,22 @@ public class IoTDBDataRegionSource extends IoTDBSource {
   }
 
   private void constructRealtimeExtractor(final PipeParameters parameters) {
-    // Use heartbeat only extractor if disable realtime extractor
+    // Use heartbeat only source if disable realtime source
     if (!parameters.getBooleanOrDefault(
         Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
         EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
       LOGGER.info(
-          "Pipe: '{}' is set to false, use heartbeat realtime extractor.",
-          EXTRACTOR_REALTIME_ENABLE_KEY);
+          "Pipe: '{}' ('{}') is set to false, use heartbeat realtime source.",
+          EXTRACTOR_REALTIME_ENABLE_KEY,
+          SOURCE_REALTIME_ENABLE_KEY);
       return;
     }
 
-    // Use heartbeat only extractor if enable snapshot mode
-    final String extractorModeValue =
-        parameters.getStringOrDefault(
-            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
-    if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE)
-        || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)) 
{
+    // Use heartbeat only source if enable snapshot mode
+    if (PipeTaskAgent.isSnapshotMode(parameters)) {
       realtimeExtractor = new PipeRealtimeDataRegionHeartbeatSource();
-      LOGGER.info(
-          "Pipe: '{}' is set to {}, use heartbeat realtime extractor.",
-          EXTRACTOR_MODE_KEY,
-          EXTRACTOR_MODE_SNAPSHOT_VALUE);
+      LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime 
source.");
       return;
     }
 
@@ -295,7 +285,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
         realtimeExtractor = new PipeRealtimeDataRegionHybridSource();
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
-              "Pipe: Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
+              "Pipe: Unsupported source realtime mode: {}, create a hybrid 
source.",
               parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
         }
     }
@@ -345,7 +335,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
     final long startTime = System.currentTimeMillis();
     LOGGER.info(
-        "Pipe {}@{}: Starting historical extractor {} and realtime extractor 
{}.",
+        "Pipe {}@{}: Starting historical source {} and realtime source {}.",
         pipeName,
         regionId,
         historicalExtractor.getClass().getSimpleName(),
@@ -356,7 +346,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     final AtomicReference<Exception> exceptionHolder = new 
AtomicReference<>(null);
     final DataRegionId dataRegionIdObject = new DataRegionId(this.regionId);
     while (true) {
-      // try to start extractors in the data region ...
+      // try to start sources in the data region ...
       // first try to run if data region exists, then try to run if data 
region does not exist.
       // both conditions fail is not common, which means the data region is 
created during the
       // runIfPresent and runIfAbsent operations. in this case, we need to 
retry.
@@ -379,7 +369,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
         rethrowExceptionIfAny(exceptionHolder);
 
         LOGGER.info(
-            "Pipe {}@{}: Started historical extractor {} and realtime 
extractor {} successfully within {} ms.",
+            "Pipe {}@{}: Started historical source {} and realtime source {} 
successfully within {} ms.",
             pipeName,
             regionId,
             historicalExtractor.getClass().getSimpleName(),
@@ -404,7 +394,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
     } catch (final Exception e) {
       exceptionHolder.set(e);
       LOGGER.warn(
-          "Pipe {}@{}: Start historical extractor {} and realtime extractor {} 
error.",
+          "Pipe {}@{}: Start historical source {} and realtime source {} 
error.",
           pipeName,
           regionId,
           historicalExtractor.getClass().getSimpleName(),
@@ -415,7 +405,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
 
   private void rethrowExceptionIfAny(final AtomicReference<Exception> 
exceptionHolder) {
     if (exceptionHolder.get() != null) {
-      throw new PipeException("failed to start extractors.", 
exceptionHolder.get());
+      throw new PipeException("failed to start sources.", 
exceptionHolder.get());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index 0ed563d52b5..91f5be62f11 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -78,10 +79,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_START_TIME_KEY;
@@ -91,7 +88,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_START_TIME_KEY;
 
@@ -216,7 +212,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
     isHistoricalSourceEnabled =
         parameters.getBooleanOrDefault(
-                SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+                SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
             || parameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -289,12 +285,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
                 || // Should extract deletion
                 listeningOptionPair.getRight());
 
-    final String extractorModeValue =
-        parameters.getStringOrDefault(
-            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
-    shouldTerminatePipeOnAllHistoricalEventsConsumed =
-        extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-            || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+    shouldTerminatePipeOnAllHistoricalEventsConsumed = 
PipeTaskAgent.isSnapshotMode(parameters);
 
     isForwardingPipeRequests =
         parameters.getBooleanOrDefault(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 7e19a267dc5..959870898a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -210,7 +210,7 @@ public class SubscriptionSinkSubtaskManager {
   private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
     final TreeMap<String, String> sortedStringSourceMap =
         new TreeMap<>(pipeConnectorParameters.getAttribute());
-    sortedStringSourceMap.remove(SystemConstant.RESTART_KEY);
+    sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
     return sortedStringSourceMap.toString();
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index b695b8f6ad3..726bc1b6a1f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -63,6 +64,18 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
+
 /**
  * State transition diagram of a pipe task:
  *
@@ -1095,6 +1108,31 @@ public abstract class PipeTaskAgent {
   public abstract void runPipeTasks(
       final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> 
runSingle);
 
+  public static boolean isHistoryOnlyPipe(final PipeParameters parameters) {
+    return isSnapshotMode(parameters)
+        || !parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+            EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+  }
+
+  public static boolean isRealtimeOnlyPipe(final PipeParameters parameters) {
+    return !isSnapshotMode(parameters)
+        && !parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+  }
+
+  public static boolean isSnapshotMode(final PipeParameters parameters) {
+    final boolean isSnapshotMode;
+    final String sourceModeValue =
+        parameters.getStringOrDefault(
+            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+    isSnapshotMode =
+        sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+            || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+    return isSnapshotMode;
+  }
+
   ///////////////////////// Maintain meta info /////////////////////////
 
   public long getPipeCreationTime(final String pipeName) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index bf9a1869246..9584ca8cbab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -44,6 +44,8 @@ import java.util.concurrent.atomic.AtomicReference;
 public class PipeTaskMeta {
 
   private final AtomicReference<ProgressIndex> progressIndex = new 
AtomicReference<>();
+
+  // This is -2 - leaderNodeId iff it's a newly added region with internal 
source.
   private final AtomicInteger leaderNodeId = new AtomicInteger(0);
 
   /**
@@ -63,21 +65,44 @@ public class PipeTaskMeta {
     this.leaderNodeId.set(leaderNodeId);
   }
 
-  public ProgressIndex getProgressIndex() {
-    return progressIndex.get();
+  ///////////////////////// Region old & new test /////////////////////////
+
+  public PipeTaskMeta markAsNewlyAdded() {
+    leaderNodeId.getAndUpdate(PipeTaskMeta::getRevertedLeader);
+    return this;
   }
 
-  public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
-    return progressIndex.updateAndGet(
-        index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
+  public boolean isNewlyAdded() {
+    return isNewlyAdded(leaderNodeId.get());
   }
 
   public int getLeaderNodeId() {
-    return leaderNodeId.get();
+    final int result = leaderNodeId.get();
+    return isNewlyAdded(result) ? getRevertedLeader(result) : result;
   }
 
   public void setLeaderNodeId(final int leaderNodeId) {
-    this.leaderNodeId.set(leaderNodeId);
+    this.leaderNodeId.updateAndGet(
+        leaderId -> isNewlyAdded(leaderId) ? getRevertedLeader(leaderNodeId) : 
leaderNodeId);
+  }
+
+  public static int getRevertedLeader(final int leaderNodeId) {
+    return -2 - leaderNodeId;
+  }
+
+  public static boolean isNewlyAdded(final int leaderNodeId) {
+    return leaderNodeId < -1;
+  }
+
+  ///////////////////////// Normal /////////////////////////
+
+  public ProgressIndex getProgressIndex() {
+    return progressIndex.get();
+  }
+
+  public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
+    return progressIndex.updateAndGet(
+        index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
   }
 
   public synchronized Iterable<PipeRuntimeException> getExceptionMessages() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index c77c56fb9c0..9c1bc1b521c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.commons.pipe.config.constant;
 
 public class SystemConstant {
 
-  public static final String RESTART_KEY = "__system.restart";
+  // This can be arbitrarily changed since it's only a memory key and not 
stored
+  public static final String RESTART_OR_NEWLY_ADDED_KEY = 
"__system.restart_or_newly_added";
   public static final boolean RESTART_DEFAULT_VALUE = false;
 
   private SystemConstant() {


Reply via email to