This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch pipe-memory-estimation-short-term
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 00962e625bd48503ee66453f396a9e8e8e93d7c9
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 29 10:44:51 2026 +0800

    Pipe: improve incremental memory estimation
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 282 ++++++++++++++++++---
 .../task/builder/PipeDataNodeTaskBuilder.java      |  35 ++-
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  96 ++++---
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  26 +-
 4 files changed, 339 insertions(+), 100 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 3509e6b29ce..43b76f69176 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
@@ -38,6 +39,7 @@ import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -46,6 +48,7 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -54,6 +57,7 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeBuilder;
 import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeTaskBuilder;
+import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtaskManager;
 import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -192,6 +196,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
           // For internal source
           || needConstructDataRegionTask
           || needConstructSchemaRegionTask) {
+        calculateMemoryUsage(
+            pipeStaticMeta,
+            Collections.singletonList(new Pair<>(consensusGroupId, 
pipeTaskMeta)),
+            false);
+
         final PipeDataNodeTask pipeTask =
             new PipeDataNodeTaskBuilder(pipeStaticMeta, consensusGroupId, 
pipeTaskMeta).build();
         pipeTask.create();
@@ -779,25 +788,45 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   @Override
-  protected void calculateMemoryUsage(
-      final PipeStaticMeta staticMeta,
-      final PipeParameters sourceParameters,
-      final PipeParameters processorParameters,
-      final PipeParameters sinkParameters) {
+  protected void calculateMemoryUsage(final PipeMeta pipeMetaFromCoordinator)
+      throws IllegalPathException {
+    final PipeStaticMeta staticMeta = pipeMetaFromCoordinator.getStaticMeta();
     if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()
-        || !isInnerSource(sourceParameters)
+        || !isInnerSource(staticMeta.getSourceParameters())
         || !PipeType.USER.equals(staticMeta.getPipeType())) {
       return;
     }
 
-    calculateInsertNodeQueueMemory(sourceParameters);
+    final PipeMeta pipeMetaInAgent = 
pipeMetaKeeper.getPipeMeta(staticMeta.getPipeName());
+    final boolean ignoreRegisteredSinkSubtasks =
+        Objects.nonNull(pipeMetaInAgent)
+            && (!pipeMetaInAgent.getStaticMeta().equals(staticMeta)
+                || 
PipeStatus.DROPPED.equals(pipeMetaInAgent.getRuntimeMeta().getStatus().get()));
+    calculateMemoryUsage(
+        staticMeta,
+        collectPipeTasksToBeCreated(pipeMetaFromCoordinator),
+        ignoreRegisteredSinkSubtasks);
+  }
 
-    long needMemory = 0;
+  private void calculateMemoryUsage(
+      final PipeStaticMeta staticMeta,
+      final List<Pair<Integer, PipeTaskMeta>> pipeTasksToBeCreated,
+      final boolean ignoreRegisteredSinkSubtasks)
+      throws IllegalPathException {
+    if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()
+        || !isInnerSource(staticMeta.getSourceParameters())
+        || !PipeType.USER.equals(staticMeta.getPipeType())
+        || pipeTasksToBeCreated.isEmpty()) {
+      return;
+    }
 
-    needMemory += calculateTsFileParserMemory(sourceParameters, 
sinkParameters);
-    needMemory += calculateSinkBatchMemory(sinkParameters);
-    needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, 
sinkParameters);
-    needMemory += calculateAssignerMemory(sourceParameters);
+    final MemoryEstimation memoryEstimation =
+        calculateIncrementalMemoryUsage(
+            staticMeta, pipeTasksToBeCreated, ignoreRegisteredSinkSubtasks);
+    calculateInsertNodeQueueMemory(
+        staticMeta.getSourceParameters(), 
memoryEstimation.dataRegionTaskCount);
+
+    final long needMemory = memoryEstimation.nonFloatingMemoryInBytes;
 
     PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory();
     final long freeMemorySizeInBytes = 
pipeMemoryManager.getFreeMemorySizeInBytes();
@@ -819,6 +848,109 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
+  private List<Pair<Integer, PipeTaskMeta>> collectPipeTasksToBeCreated(
+      final PipeMeta pipeMetaFromCoordinator) throws IllegalPathException {
+    final PipeStaticMeta pipeStaticMeta = 
pipeMetaFromCoordinator.getStaticMeta();
+    final PipeParameters sourceParameters = 
pipeStaticMeta.getSourceParameters();
+    final Set<DataRegionId> dataRegionIds =
+        new HashSet<>(StorageEngine.getInstance().getAllDataRegionIds());
+    final Set<SchemaRegionId> schemaRegionIds =
+        new HashSet<>(SchemaEngine.getInstance().getAllSchemaRegionIds());
+    final List<Pair<Integer, PipeTaskMeta>> pipeTasksToBeCreated = new 
ArrayList<>();
+
+    for (final Map.Entry<Integer, PipeTaskMeta> consensusGroupIdToPipeTaskMeta 
:
+        
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().entrySet())
 {
+      final int consensusGroupId = consensusGroupIdToPipeTaskMeta.getKey();
+      final PipeTaskMeta pipeTaskMeta = 
consensusGroupIdToPipeTaskMeta.getValue();
+      if (pipeTaskMeta.getLeaderNodeId() != CONFIG.getDataNodeId()) {
+        continue;
+      }
+
+      final boolean needConstructTask;
+      if (pipeStaticMeta.isSourceExternal()) {
+        needConstructTask = true;
+      } else {
+        final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
+        final boolean needConstructDataRegionTask =
+            dataRegionIds.contains(dataRegionId)
+                && DataRegionListeningFilter.shouldDataRegionBeListened(
+                    sourceParameters, dataRegionId);
+        final boolean needConstructSchemaRegionTask =
+            schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
+                && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
+                    consensusGroupId, sourceParameters);
+        needConstructTask = needConstructDataRegionTask || 
needConstructSchemaRegionTask;
+      }
+
+      if (needConstructTask) {
+        pipeTasksToBeCreated.add(new Pair<>(consensusGroupId, pipeTaskMeta));
+      }
+    }
+    return pipeTasksToBeCreated;
+  }
+
+  private MemoryEstimation calculateIncrementalMemoryUsage(
+      final PipeStaticMeta staticMeta,
+      final List<Pair<Integer, PipeTaskMeta>> pipeTasksToBeCreated,
+      final boolean ignoreRegisteredSinkSubtasks) {
+    long needMemory = 0;
+    int dataRegionTaskCount = 0;
+    final Set<String> sinkSubtasksToBeCreated = new HashSet<>();
+
+    for (final Pair<Integer, PipeTaskMeta> regionIdAndTaskMeta : 
pipeTasksToBeCreated) {
+      final int regionId = regionIdAndTaskMeta.getLeft();
+      final PipeTaskMeta pipeTaskMeta = regionIdAndTaskMeta.getRight();
+      final PipeParameters sourceParameters =
+          PipeDataNodeTaskBuilder.blendUserAndSystemParameters(
+              staticMeta.getSourceParameters(), pipeTaskMeta);
+      final PipeParameters sinkParameters =
+          PipeDataNodeTaskBuilder.blendUserAndSystemParameters(
+              staticMeta.getSinkParameters(), pipeTaskMeta);
+      PipeDataNodeTaskBuilder.preprocessParameters(sourceParameters, 
sinkParameters);
+
+      final boolean isDataRegionTask = isDataRegionTask(regionId);
+      if (isDataRegionTask) {
+        dataRegionTaskCount++;
+        needMemory += calculateTsFileParserMemory(sourceParameters, 
sinkParameters);
+      }
+
+      final String sinkSubtaskId =
+          PipeSinkSubtaskManager.generateAttributeSortedString(sinkParameters, 
regionId);
+      if (isDataRegionTask
+          && !sinkSubtasksToBeCreated.contains(sinkSubtaskId)
+          && (ignoreRegisteredSinkSubtasks
+              || !PipeSinkSubtaskManager.instance()
+                  .hasRegisteredSubtasks(sinkParameters, regionId))) {
+        sinkSubtasksToBeCreated.add(sinkSubtaskId);
+        final int sinkSubtaskNum =
+            PipeSinkSubtaskManager.calculateSinkSubtaskNum(sinkParameters, 
regionId);
+        needMemory += calculateSinkBatchMemory(sinkParameters) * 
sinkSubtaskNum;
+        needMemory +=
+            calculateSendTsFileReadBufferMemory(sourceParameters, 
sinkParameters) * sinkSubtaskNum;
+      }
+    }
+
+    if (dataRegionTaskCount > 0) {
+      needMemory += calculateAssignerMemory(staticMeta.getSourceParameters());
+    }
+    return new MemoryEstimation(needMemory, dataRegionTaskCount);
+  }
+
+  private boolean isDataRegionTask(final int regionId) {
+    return StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))
+        || PipeRuntimeMeta.isSourceExternal(regionId);
+  }
+
+  private static class MemoryEstimation {
+    private final long nonFloatingMemoryInBytes;
+    private final int dataRegionTaskCount;
+
+    private MemoryEstimation(final long nonFloatingMemoryInBytes, final int 
dataRegionTaskCount) {
+      this.nonFloatingMemoryInBytes = nonFloatingMemoryInBytes;
+      this.dataRegionTaskCount = dataRegionTaskCount;
+    }
+  }
+
   private boolean isInnerSource(final PipeParameters sourceParameters) {
     final String pluginName =
         sourceParameters
@@ -831,7 +963,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
         || 
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName());
   }
 
-  private void calculateInsertNodeQueueMemory(final PipeParameters 
sourceParameters) {
+  private void calculateInsertNodeQueueMemory(
+      final PipeParameters sourceParameters, final int dataRegionTaskCount) {
+    if (dataRegionTaskCount <= 0) {
+      return;
+    }
 
     // Realtime source is enabled by default, so we only need to check the 
source realtime
     if (!sourceParameters.getBooleanOrDefault(
@@ -850,17 +986,17 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return;
     }
 
+    final long needFloatingMemory =
+        PipeConfig.getInstance().getPipeInsertNodeQueueMemory() * 
dataRegionTaskCount;
     final long allocatedMemorySizeInBytes = 
this.getAllFloatingMemoryUsageInByte();
     final long remainingMemory =
         
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes()
             - allocatedMemorySizeInBytes;
-    if (remainingMemory < 
PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) {
+    if (remainingMemory < needFloatingMemory) {
       final String message =
           String.format(
               "%s Need Floating memory: %d  bytes, free Floating memory: %d 
bytes",
-              PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE,
-              PipeConfig.getInstance().getPipeInsertNodeQueueMemory(),
-              remainingMemory);
+              PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, needFloatingMemory, 
remainingMemory);
       LOGGER.warn(message);
       throw new PipeException(message);
     }
@@ -939,32 +1075,108 @@ public class PipeDataNodeTaskAgent extends 
PipeTaskAgent {
   }
 
   private long calculateSinkBatchMemory(final PipeParameters sinkParameters) {
+    final String format =
+        sinkParameters.getStringOrDefault(
+            Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, 
PipeSinkConstant.SINK_FORMAT_KEY),
+            PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE);
+    final boolean usingTsFileBatch = 
PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(format);
 
-    // If the sink format is tsfile , we need to use batch
-    boolean needUseBatch =
-        PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(
-            sinkParameters.getStringOrDefault(
+    // TsFile format always uses a batch. Other formats only use a batch when 
batch mode is enabled.
+    final boolean needUseBatch =
+        usingTsFileBatch
+            || sinkParameters.getBooleanOrDefault(
                 Arrays.asList(
-                    PipeSinkConstant.CONNECTOR_FORMAT_KEY, 
PipeSinkConstant.SINK_FORMAT_KEY),
-                PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE));
+                    PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
+                    PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
+                
PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
 
-    if (needUseBatch) {
-      return PipeConfig.getInstance().getSinkBatchMemoryTsFile();
+    if (!needUseBatch) {
+      return 0;
     }
 
-    // If the sink is batch mode, we need to use batch
-    needUseBatch =
-        sinkParameters.getBooleanOrDefault(
+    final long batchSizeInBytes =
+        sinkParameters.getLongOrDefault(
             Arrays.asList(
-                PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
-                PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
-            PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
+                PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY,
+                PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY),
+            usingTsFileBatch
+                ? 
PipeSinkConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE
+                : 
PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
 
-    if (!needUseBatch) {
-      return 0;
+    return batchSizeInBytes * calculateBatchShardCount(sinkParameters, 
usingTsFileBatch);
+  }
+
+  private long calculateBatchShardCount(
+      final PipeParameters sinkParameters, final boolean usingTsFileBatch) {
+    if (usingTsFileBatch
+        || !sinkParameters.getBooleanOrDefault(
+            Arrays.asList(
+                PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY,
+                PipeSinkConstant.SINK_LEADER_CACHE_ENABLE_KEY),
+            PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE)) {
+      return 1;
     }
 
-    return PipeConfig.getInstance().getSinkBatchMemoryInsertNode();
+    // Plain batches always allocate the default batch and may lazily allocate 
one batch per target
+    // endpoint when leader cache splits events by endpoint.
+    return 1L + calculateTargetEndPointCount(sinkParameters);
+  }
+
+  private int calculateTargetEndPointCount(final PipeParameters 
sinkParameters) {
+    final Set<TEndPoint> targetEndPoints = new HashSet<>();
+    try {
+      addTargetEndPoint(
+          targetEndPoints,
+          sinkParameters,
+          PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY,
+          PipeSinkConstant.CONNECTOR_IOTDB_HOST_KEY,
+          PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY);
+      addTargetEndPoint(
+          targetEndPoints,
+          sinkParameters,
+          PipeSinkConstant.SINK_IOTDB_IP_KEY,
+          PipeSinkConstant.SINK_IOTDB_HOST_KEY,
+          PipeSinkConstant.SINK_IOTDB_PORT_KEY);
+      if 
(sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY)) {
+        targetEndPoints.addAll(
+            NodeUrlUtils.parseTEndPointUrls(
+                Arrays.asList(
+                    sinkParameters
+                        
.getStringByKeys(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY)
+                        .replace(" ", "")
+                        .split(","))));
+      }
+      if 
(sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY)) {
+        targetEndPoints.addAll(
+            NodeUrlUtils.parseTEndPointUrls(
+                Arrays.asList(
+                    sinkParameters
+                        
.getStringByKeys(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY)
+                        .replace(" ", "")
+                        .split(","))));
+      }
+    } catch (final Exception ignored) {
+      return 1;
+    }
+    return Math.max(1, targetEndPoints.size());
+  }
+
+  private void addTargetEndPoint(
+      final Set<TEndPoint> targetEndPoints,
+      final PipeParameters sinkParameters,
+      final String ipKey,
+      final String hostKey,
+      final String portKey) {
+    if (sinkParameters.hasAttribute(ipKey) && 
sinkParameters.hasAttribute(portKey)) {
+      targetEndPoints.add(
+          new TEndPoint(
+              sinkParameters.getStringByKeys(ipKey), 
sinkParameters.getIntByKeys(portKey)));
+    }
+    if (sinkParameters.hasAttribute(hostKey) && 
sinkParameters.hasAttribute(portKey)) {
+      targetEndPoints.add(
+          new TEndPoint(
+              sinkParameters.getStringByKeys(hostKey), 
sinkParameters.getIntByKeys(portKey)));
+    }
   }
 
   private long calculateSendTsFileReadBufferMemory(
@@ -993,7 +1205,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return 0;
     }
 
-    return PipeConfig.getInstance().getSendTsFileReadBuffer();
+    return PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
   }
 
   private long calculateAssignerMemory(final PipeParameters sourceParameters) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 8747c12d076..878823d4696 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -67,14 +67,11 @@ public class PipeDataNodeTaskBuilder {
   private static final PipeProcessorSubtaskExecutor PROCESSOR_EXECUTOR =
       PipeSubtaskExecutorManager.getInstance().getProcessorExecutor();
 
-  protected final Map<String, String> systemParameters = new HashMap<>();
-
   public PipeDataNodeTaskBuilder(
       final PipeStaticMeta pipeStaticMeta, final int regionId, final 
PipeTaskMeta pipeTaskMeta) {
     this.pipeStaticMeta = pipeStaticMeta;
     this.regionId = regionId;
     this.pipeTaskMeta = pipeTaskMeta;
-    generateSystemParameters();
   }
 
   public PipeDataNodeTask build() {
@@ -82,11 +79,10 @@ public class PipeDataNodeTaskBuilder {
 
     // Analyzes the PipeParameters to identify potential conflicts.
     final PipeParameters sourceParameters =
-        blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters());
+        blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters(), 
pipeTaskMeta);
     final PipeParameters sinkParameters =
-        blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters());
-    checkConflict(sourceParameters, sinkParameters);
-    injectParameters(sourceParameters, sinkParameters);
+        blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters(), 
pipeTaskMeta);
+    preprocessParameters(sourceParameters, sinkParameters);
 
     // We first build the source and sink, then build the processor.
     final PipeTaskSourceStage sourceStage =
@@ -125,7 +121,7 @@ public class PipeDataNodeTaskBuilder {
         new PipeTaskProcessorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
+            
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters(), 
pipeTaskMeta),
             regionId,
             sourceStage.getEventSupplier(),
             sinkStage.getPipeSinkPendingQueue(),
@@ -143,22 +139,25 @@ public class PipeDataNodeTaskBuilder {
         pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, 
sinkStage);
   }
 
-  private void generateSystemParameters() {
+  public static PipeParameters blendUserAndSystemParameters(
+      final PipeParameters userParameters, final PipeTaskMeta pipeTaskMeta) {
+    // Deep copy the user parameters to avoid modification of the original 
parameters.
+    // If the original parameters are modified, progress index report will be 
affected.
+    final Map<String, String> blendedParameters = new 
HashMap<>(userParameters.getAttribute());
     if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
         || pipeTaskMeta.isNewlyAdded()) {
-      systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
Boolean.TRUE.toString());
+      blendedParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, 
Boolean.TRUE.toString());
     }
+    return new PipeParameters(blendedParameters);
   }
 
-  private PipeParameters blendUserAndSystemParameters(final PipeParameters 
userParameters) {
-    // Deep copy the user parameters to avoid modification of the original 
parameters.
-    // If the original parameters are modified, progress index report will be 
affected.
-    final Map<String, String> blendedParameters = new 
HashMap<>(userParameters.getAttribute());
-    blendedParameters.putAll(systemParameters);
-    return new PipeParameters(blendedParameters);
+  public static void preprocessParameters(
+      final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
+    checkConflict(sourceParameters, sinkParameters);
+    injectParameters(sourceParameters, sinkParameters);
   }
 
-  private void checkConflict(
+  private static void checkConflict(
       final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
     final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
@@ -228,7 +227,7 @@ public class PipeDataNodeTaskBuilder {
     }
   }
 
-  private void injectParameters(
+  private static void injectParameters(
       final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
     final boolean isSourceExternal =
         !BuiltinPipePlugin.BUILTIN_SOURCES.contains(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index cca6de44f0b..33c89aa8967 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -69,13 +69,7 @@ public class PipeSinkSubtaskManager {
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
       final PipeParameters pipeSinkParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
-    final String connectorName =
-        PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters);
-    final String connectorKey =
-        connectorName
-            // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
-            // for matching in `CONNECTOR_CONSTRUCTORS`
-            .toLowerCase();
+    final String connectorKey = getConnectorKey(pipeSinkParameters);
     PipeEventCommitManager.getInstance()
         .register(
             environment.getPipeName(),
@@ -83,48 +77,23 @@ public class PipeSinkSubtaskManager {
             environment.getRegionId(),
             connectorKey);
 
-    final boolean isDataRegionSink =
-        StorageEngine.getInstance()
-                .getAllDataRegionIds()
-                .contains(new DataRegionId(environment.getRegionId()))
-            || PipeRuntimeMeta.isSourceExternal(environment.getRegionId());
-
-    final int sinkNum;
+    final boolean isDataRegionSink = 
isDataRegionSink(environment.getRegionId());
+    final int sinkNum = calculateSinkSubtaskNum(pipeSinkParameters, 
isDataRegionSink, connectorKey);
     boolean realTimeFirst = false;
-    boolean serializeByRegion = false;
-    String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
+    final String attributeSortedString =
+        generateAttributeSortedString(pipeSinkParameters, 
environment.getRegionId());
     final String attributeDisplayString = 
generateAttributeDisplayString(pipeSinkParameters);
     if (isDataRegionSink) {
-      serializeByRegion = 
PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters);
-      sinkNum =
-          serializeByRegion
-              ? 1
-              : pipeSinkParameters.getIntOrDefault(
-                  Arrays.asList(
-                      PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
-                      PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
-                  
PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
-                      ? 1
-                      : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       realTimeFirst =
           pipeSinkParameters.getBooleanOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                   PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
               PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
-      attributeSortedString =
-          serializeByRegion
-              ? "data_region_" + environment.getRegionId() + "_" + 
attributeSortedString
-              : "data_" + attributeSortedString;
-    } else {
-      // Do not allow parallel tasks for schema region connectors
-      // to avoid the potential disorder of the schema region data transfer
-      sinkNum = 1;
-      attributeSortedString = "schema_" + attributeSortedString;
     }
     final String attributeDisplayStringWithPrefix =
         isDataRegionSink
-            ? serializeByRegion
+            ? PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)
                 ? "data_region_" + environment.getRegionId() + "_" + 
attributeDisplayString
                 : "data_" + attributeDisplayString
             : "schema_" + attributeDisplayString;
@@ -285,6 +254,59 @@ public class PipeSinkSubtaskManager {
         .getPendingQueue();
   }
 
+  public synchronized boolean hasRegisteredSubtasks(
+      final PipeParameters pipeSinkParameters, final int regionId) {
+    return attributeSortedString2SubtaskLifeCycleMap.containsKey(
+        generateAttributeSortedString(pipeSinkParameters, regionId));
+  }
+
+  public static int calculateSinkSubtaskNum(
+      final PipeParameters pipeSinkParameters, final int regionId) {
+    final String connectorKey = getConnectorKey(pipeSinkParameters);
+    return calculateSinkSubtaskNum(pipeSinkParameters, 
isDataRegionSink(regionId), connectorKey);
+  }
+
+  public static String generateAttributeSortedString(
+      final PipeParameters pipeSinkParameters, final int regionId) {
+    final String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
+    if (isDataRegionSink(regionId)) {
+      return PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)
+          ? "data_region_" + regionId + "_" + attributeSortedString
+          : "data_" + attributeSortedString;
+    }
+    return "schema_" + attributeSortedString;
+  }
+
+  private static String getConnectorKey(final PipeParameters 
pipeSinkParameters) {
+    return 
PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters).toLowerCase();
+  }
+
+  private static boolean isDataRegionSink(final int regionId) {
+    return StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))
+        || PipeRuntimeMeta.isSourceExternal(regionId);
+  }
+
+  private static int calculateSinkSubtaskNum(
+      final PipeParameters pipeSinkParameters,
+      final boolean isDataRegionSink,
+      final String connectorKey) {
+    if (!isDataRegionSink) {
+      // Do not allow parallel tasks for schema region connectors to avoid the 
potential disorder of
+      // the schema region data transfer.
+      return 1;
+    }
+    if (PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)) {
+      return 1;
+    }
+    return pipeSinkParameters.getIntOrDefault(
+        Arrays.asList(
+            PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+            PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
+        PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey)
+            ? 1
+            : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+  }
+
   private static String generateAttributeSortedString(
       final PipeParameters pipeConnectorParameters) {
     final TreeMap<String, String> sortedStringSourceMap =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index c30a50ac495..48307d3b96f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -508,18 +508,15 @@ public abstract class PipeTaskAgent {
     final String pipeName = 
pipeMetaFromCoordinator.getStaticMeta().getPipeName();
     final long creationTime = 
pipeMetaFromCoordinator.getStaticMeta().getCreationTime();
 
-    calculateMemoryUsage(
-        pipeMetaFromCoordinator.getStaticMeta(),
-        pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(),
-        pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(),
-        pipeMetaFromCoordinator.getStaticMeta().getSinkParameters());
-
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
-    if (existedPipeMeta != null) {
-      if (!checkBeforeCreatePipe(existedPipeMeta, pipeName, creationTime)) {
-        return false;
-      }
+    if (existedPipeMeta != null
+        && !checkBeforeCreatePipe(existedPipeMeta, pipeName, creationTime)) {
+      return false;
+    }
 
+    calculateMemoryUsage(pipeMetaFromCoordinator);
+
+    if (existedPipeMeta != null) {
       // Drop the pipe if
       // 1. The pipe with the same name but with different creation time has 
been created before
       // 2. The pipe with the same name and the same creation time has been 
dropped before, but the
@@ -552,6 +549,15 @@ public abstract class PipeTaskAgent {
     return needToStartPipe;
   }
 
+  protected void calculateMemoryUsage(final PipeMeta pipeMetaFromCoordinator)
+      throws IllegalPathException {
+    calculateMemoryUsage(
+        pipeMetaFromCoordinator.getStaticMeta(),
+        pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(),
+        pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(),
+        pipeMetaFromCoordinator.getStaticMeta().getSinkParameters());
+  }
+
   protected void calculateMemoryUsage(
       final PipeStaticMeta staticMeta,
       final PipeParameters extractorParameters,

Reply via email to