This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch pipe-memory-estimation-short-term in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 00962e625bd48503ee66453f396a9e8e8e93d7c9 Author: Caideyipi <[email protected]> AuthorDate: Mon Jun 29 10:44:51 2026 +0800 Pipe: improve incremental memory estimation --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 282 ++++++++++++++++++--- .../task/builder/PipeDataNodeTaskBuilder.java | 35 ++- .../task/subtask/sink/PipeSinkSubtaskManager.java | 96 ++++--- .../commons/pipe/agent/task/PipeTaskAgent.java | 26 +- 4 files changed, 339 insertions(+), 100 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 3509e6b29ce..43b76f69176 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.agent.task; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; @@ -38,6 +39,7 @@ import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; import org.apache.iotdb.commons.pipe.config.PipeConfig; @@ -46,6 +48,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -54,6 +57,7 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeBuilder; import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeTaskBuilder; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtaskManager; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -192,6 +196,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { // For internal source || needConstructDataRegionTask || needConstructSchemaRegionTask) { + calculateMemoryUsage( + pipeStaticMeta, + Collections.singletonList(new Pair<>(consensusGroupId, pipeTaskMeta)), + false); + final PipeDataNodeTask pipeTask = new PipeDataNodeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build(); pipeTask.create(); @@ -779,25 +788,45 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } @Override - protected void calculateMemoryUsage( - final PipeStaticMeta staticMeta, - final PipeParameters sourceParameters, - final PipeParameters processorParameters, - final PipeParameters sinkParameters) { + protected void calculateMemoryUsage(final PipeMeta pipeMetaFromCoordinator) + throws IllegalPathException { + final PipeStaticMeta staticMeta = pipeMetaFromCoordinator.getStaticMeta(); if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() - || !isInnerSource(sourceParameters) + || !isInnerSource(staticMeta.getSourceParameters()) || !PipeType.USER.equals(staticMeta.getPipeType())) { return; } - calculateInsertNodeQueueMemory(sourceParameters); + final PipeMeta pipeMetaInAgent = pipeMetaKeeper.getPipeMeta(staticMeta.getPipeName()); + final boolean ignoreRegisteredSinkSubtasks = + Objects.nonNull(pipeMetaInAgent) + && (!pipeMetaInAgent.getStaticMeta().equals(staticMeta) + || PipeStatus.DROPPED.equals(pipeMetaInAgent.getRuntimeMeta().getStatus().get())); + calculateMemoryUsage( + staticMeta, + collectPipeTasksToBeCreated(pipeMetaFromCoordinator), + ignoreRegisteredSinkSubtasks); + } - long needMemory = 0; + private void calculateMemoryUsage( + final PipeStaticMeta staticMeta, + final List<Pair<Integer, PipeTaskMeta>> pipeTasksToBeCreated, + final boolean ignoreRegisteredSinkSubtasks) + throws IllegalPathException { + if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() + || !isInnerSource(staticMeta.getSourceParameters()) + || !PipeType.USER.equals(staticMeta.getPipeType()) + || pipeTasksToBeCreated.isEmpty()) { + return; + } - needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); - needMemory += calculateSinkBatchMemory(sinkParameters); - needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters); - needMemory += calculateAssignerMemory(sourceParameters); + final MemoryEstimation memoryEstimation = + calculateIncrementalMemoryUsage( + staticMeta, pipeTasksToBeCreated, ignoreRegisteredSinkSubtasks); + calculateInsertNodeQueueMemory( + staticMeta.getSourceParameters(), memoryEstimation.dataRegionTaskCount); + + final long needMemory = memoryEstimation.nonFloatingMemoryInBytes; PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory(); final long freeMemorySizeInBytes = pipeMemoryManager.getFreeMemorySizeInBytes(); @@ -819,6 +848,109 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } } + private List<Pair<Integer, PipeTaskMeta>> collectPipeTasksToBeCreated( + final PipeMeta pipeMetaFromCoordinator) throws IllegalPathException { + final PipeStaticMeta pipeStaticMeta = pipeMetaFromCoordinator.getStaticMeta(); + final PipeParameters sourceParameters = pipeStaticMeta.getSourceParameters(); + final Set<DataRegionId> dataRegionIds = + new HashSet<>(StorageEngine.getInstance().getAllDataRegionIds()); + final Set<SchemaRegionId> schemaRegionIds = + new HashSet<>(SchemaEngine.getInstance().getAllSchemaRegionIds()); + final List<Pair<Integer, PipeTaskMeta>> pipeTasksToBeCreated = new ArrayList<>(); + + for (final Map.Entry<Integer, PipeTaskMeta> consensusGroupIdToPipeTaskMeta : + pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().entrySet()) { + final int consensusGroupId = consensusGroupIdToPipeTaskMeta.getKey(); + final PipeTaskMeta pipeTaskMeta = consensusGroupIdToPipeTaskMeta.getValue(); + if (pipeTaskMeta.getLeaderNodeId() != CONFIG.getDataNodeId()) { + continue; + } + + final boolean needConstructTask; + if (pipeStaticMeta.isSourceExternal()) { + needConstructTask = true; + } else { + final DataRegionId dataRegionId = new DataRegionId(consensusGroupId); + final boolean needConstructDataRegionTask = + dataRegionIds.contains(dataRegionId) + && DataRegionListeningFilter.shouldDataRegionBeListened( + sourceParameters, dataRegionId); + final boolean needConstructSchemaRegionTask = + schemaRegionIds.contains(new SchemaRegionId(consensusGroupId)) + && SchemaRegionListeningFilter.shouldSchemaRegionBeListened( + consensusGroupId, sourceParameters); + needConstructTask = needConstructDataRegionTask || needConstructSchemaRegionTask; + } + + if (needConstructTask) { + pipeTasksToBeCreated.add(new Pair<>(consensusGroupId, pipeTaskMeta)); + } + } + return pipeTasksToBeCreated; + } + + private MemoryEstimation calculateIncrementalMemoryUsage( + final PipeStaticMeta staticMeta, + final List<Pair<Integer, PipeTaskMeta>> pipeTasksToBeCreated, + final boolean ignoreRegisteredSinkSubtasks) { + long needMemory = 0; + int dataRegionTaskCount = 0; + final Set<String> sinkSubtasksToBeCreated = new HashSet<>(); + + for (final Pair<Integer, PipeTaskMeta> regionIdAndTaskMeta : pipeTasksToBeCreated) { + final int regionId = regionIdAndTaskMeta.getLeft(); + final PipeTaskMeta pipeTaskMeta = regionIdAndTaskMeta.getRight(); + final PipeParameters sourceParameters = + PipeDataNodeTaskBuilder.blendUserAndSystemParameters( + staticMeta.getSourceParameters(), pipeTaskMeta); + final PipeParameters sinkParameters = + PipeDataNodeTaskBuilder.blendUserAndSystemParameters( + staticMeta.getSinkParameters(), pipeTaskMeta); + PipeDataNodeTaskBuilder.preprocessParameters(sourceParameters, sinkParameters); + + final boolean isDataRegionTask = isDataRegionTask(regionId); + if (isDataRegionTask) { + dataRegionTaskCount++; + needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); + } + + final String sinkSubtaskId = + PipeSinkSubtaskManager.generateAttributeSortedString(sinkParameters, regionId); + if (isDataRegionTask + && !sinkSubtasksToBeCreated.contains(sinkSubtaskId) + && (ignoreRegisteredSinkSubtasks + || !PipeSinkSubtaskManager.instance() + .hasRegisteredSubtasks(sinkParameters, regionId))) { + sinkSubtasksToBeCreated.add(sinkSubtaskId); + final int sinkSubtaskNum = + PipeSinkSubtaskManager.calculateSinkSubtaskNum(sinkParameters, regionId); + needMemory += calculateSinkBatchMemory(sinkParameters) * sinkSubtaskNum; + needMemory += + calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters) * sinkSubtaskNum; + } + } + + if (dataRegionTaskCount > 0) { + needMemory += calculateAssignerMemory(staticMeta.getSourceParameters()); + } + return new MemoryEstimation(needMemory, dataRegionTaskCount); + } + + private boolean isDataRegionTask(final int regionId) { + return StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + || PipeRuntimeMeta.isSourceExternal(regionId); + } + + private static class MemoryEstimation { + private final long nonFloatingMemoryInBytes; + private final int dataRegionTaskCount; + + private MemoryEstimation(final long nonFloatingMemoryInBytes, final int dataRegionTaskCount) { + this.nonFloatingMemoryInBytes = nonFloatingMemoryInBytes; + this.dataRegionTaskCount = dataRegionTaskCount; + } + } + private boolean isInnerSource(final PipeParameters sourceParameters) { final String pluginName = sourceParameters @@ -831,7 +963,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { || pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName()); } - private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameters) { + private void calculateInsertNodeQueueMemory( + final PipeParameters sourceParameters, final int dataRegionTaskCount) { + if (dataRegionTaskCount <= 0) { + return; + } // Realtime source is enabled by default, so we only need to check the source realtime if (!sourceParameters.getBooleanOrDefault( @@ -850,17 +986,17 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return; } + final long needFloatingMemory = + PipeConfig.getInstance().getPipeInsertNodeQueueMemory() * dataRegionTaskCount; final long allocatedMemorySizeInBytes = this.getAllFloatingMemoryUsageInByte(); final long remainingMemory = PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes() - allocatedMemorySizeInBytes; - if (remainingMemory < PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) { + if (remainingMemory < needFloatingMemory) { final String message = String.format( "%s Need Floating memory: %d bytes, free Floating memory: %d bytes", - PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, - PipeConfig.getInstance().getPipeInsertNodeQueueMemory(), - remainingMemory); + PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, needFloatingMemory, remainingMemory); LOGGER.warn(message); throw new PipeException(message); } @@ -939,32 +1075,108 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } private long calculateSinkBatchMemory(final PipeParameters sinkParameters) { + final String format = + sinkParameters.getStringOrDefault( + Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), + PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); + final boolean usingTsFileBatch = PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(format); - // If the sink format is tsfile , we need to use batch - boolean needUseBatch = - PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals( - sinkParameters.getStringOrDefault( + // TsFile format always uses a batch. Other formats only use a batch when batch mode is enabled. + final boolean needUseBatch = + usingTsFileBatch + || sinkParameters.getBooleanOrDefault( Arrays.asList( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), - PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE)); + PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, + PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), + PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); - if (needUseBatch) { - return PipeConfig.getInstance().getSinkBatchMemoryTsFile(); + if (!needUseBatch) { + return 0; } - // If the sink is batch mode, we need to use batch - needUseBatch = - sinkParameters.getBooleanOrDefault( + final long batchSizeInBytes = + sinkParameters.getLongOrDefault( Arrays.asList( - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, - PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); + PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, + PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY), + usingTsFileBatch + ? PipeSinkConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE + : PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); - if (!needUseBatch) { - return 0; + return batchSizeInBytes * calculateBatchShardCount(sinkParameters, usingTsFileBatch); + } + + private long calculateBatchShardCount( + final PipeParameters sinkParameters, final boolean usingTsFileBatch) { + if (usingTsFileBatch + || !sinkParameters.getBooleanOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY, + PipeSinkConstant.SINK_LEADER_CACHE_ENABLE_KEY), + PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE)) { + return 1; } - return PipeConfig.getInstance().getSinkBatchMemoryInsertNode(); + // Plain batches always allocate the default batch and may lazily allocate one batch per target + // endpoint when leader cache splits events by endpoint. + return 1L + calculateTargetEndPointCount(sinkParameters); + } + + private int calculateTargetEndPointCount(final PipeParameters sinkParameters) { + final Set<TEndPoint> targetEndPoints = new HashSet<>(); + try { + addTargetEndPoint( + targetEndPoints, + sinkParameters, + PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_HOST_KEY, + PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY); + addTargetEndPoint( + targetEndPoints, + sinkParameters, + PipeSinkConstant.SINK_IOTDB_IP_KEY, + PipeSinkConstant.SINK_IOTDB_HOST_KEY, + PipeSinkConstant.SINK_IOTDB_PORT_KEY); + if (sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY)) { + targetEndPoints.addAll( + NodeUrlUtils.parseTEndPointUrls( + Arrays.asList( + sinkParameters + .getStringByKeys(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY) + .replace(" ", "") + .split(",")))); + } + if (sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY)) { + targetEndPoints.addAll( + NodeUrlUtils.parseTEndPointUrls( + Arrays.asList( + sinkParameters + .getStringByKeys(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY) + .replace(" ", "") + .split(",")))); + } + } catch (final Exception ignored) { + return 1; + } + return Math.max(1, targetEndPoints.size()); + } + + private void addTargetEndPoint( + final Set<TEndPoint> targetEndPoints, + final PipeParameters sinkParameters, + final String ipKey, + final String hostKey, + final String portKey) { + if (sinkParameters.hasAttribute(ipKey) && sinkParameters.hasAttribute(portKey)) { + targetEndPoints.add( + new TEndPoint( + sinkParameters.getStringByKeys(ipKey), sinkParameters.getIntByKeys(portKey))); + } + if (sinkParameters.hasAttribute(hostKey) && sinkParameters.hasAttribute(portKey)) { + targetEndPoints.add( + new TEndPoint( + sinkParameters.getStringByKeys(hostKey), sinkParameters.getIntByKeys(portKey))); + } } private long calculateSendTsFileReadBufferMemory( @@ -993,7 +1205,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return 0; } - return PipeConfig.getInstance().getSendTsFileReadBuffer(); + return PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); } private long calculateAssignerMemory(final PipeParameters sourceParameters) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index 8747c12d076..878823d4696 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -67,14 +67,11 @@ public class PipeDataNodeTaskBuilder { private static final PipeProcessorSubtaskExecutor PROCESSOR_EXECUTOR = PipeSubtaskExecutorManager.getInstance().getProcessorExecutor(); - protected final Map<String, String> systemParameters = new HashMap<>(); - public PipeDataNodeTaskBuilder( final PipeStaticMeta pipeStaticMeta, final int regionId, final PipeTaskMeta pipeTaskMeta) { this.pipeStaticMeta = pipeStaticMeta; this.regionId = regionId; this.pipeTaskMeta = pipeTaskMeta; - generateSystemParameters(); } public PipeDataNodeTask build() { @@ -82,11 +79,10 @@ public class PipeDataNodeTaskBuilder { // Analyzes the PipeParameters to identify potential conflicts. final PipeParameters sourceParameters = - blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters()); + blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters(), pipeTaskMeta); final PipeParameters sinkParameters = - blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters()); - checkConflict(sourceParameters, sinkParameters); - injectParameters(sourceParameters, sinkParameters); + blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters(), pipeTaskMeta); + preprocessParameters(sourceParameters, sinkParameters); // We first build the source and sink, then build the processor. final PipeTaskSourceStage sourceStage = @@ -125,7 +121,7 @@ public class PipeDataNodeTaskBuilder { new PipeTaskProcessorStage( pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime(), - blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()), + blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters(), pipeTaskMeta), regionId, sourceStage.getEventSupplier(), sinkStage.getPipeSinkPendingQueue(), @@ -143,22 +139,25 @@ public class PipeDataNodeTaskBuilder { pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, sinkStage); } - private void generateSystemParameters() { + public static PipeParameters blendUserAndSystemParameters( + final PipeParameters userParameters, final PipeTaskMeta pipeTaskMeta) { + // Deep copy the user parameters to avoid modification of the original parameters. + // If the original parameters are modified, progress index report will be affected. + final Map<String, String> blendedParameters = new HashMap<>(userParameters.getAttribute()); if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) || pipeTaskMeta.isNewlyAdded()) { - systemParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString()); + blendedParameters.put(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.TRUE.toString()); } + return new PipeParameters(blendedParameters); } - private PipeParameters blendUserAndSystemParameters(final PipeParameters userParameters) { - // Deep copy the user parameters to avoid modification of the original parameters. - // If the original parameters are modified, progress index report will be affected. - final Map<String, String> blendedParameters = new HashMap<>(userParameters.getAttribute()); - blendedParameters.putAll(systemParameters); - return new PipeParameters(blendedParameters); + public static void preprocessParameters( + final PipeParameters sourceParameters, final PipeParameters sinkParameters) { + checkConflict(sourceParameters, sinkParameters); + injectParameters(sourceParameters, sinkParameters); } - private void checkConflict( + private static void checkConflict( final PipeParameters sourceParameters, final PipeParameters sinkParameters) { final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair; final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed; @@ -228,7 +227,7 @@ public class PipeDataNodeTaskBuilder { } } - private void injectParameters( + private static void injectParameters( final PipeParameters sourceParameters, final PipeParameters sinkParameters) { final boolean isSourceExternal = !BuiltinPipePlugin.BUILTIN_SOURCES.contains( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index cca6de44f0b..33c89aa8967 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -69,13 +69,7 @@ public class PipeSinkSubtaskManager { final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier, final PipeParameters pipeSinkParameters, final PipeTaskSinkRuntimeEnvironment environment) { - final String connectorName = - PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters); - final String connectorKey = - connectorName - // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase - // for matching in `CONNECTOR_CONSTRUCTORS` - .toLowerCase(); + final String connectorKey = getConnectorKey(pipeSinkParameters); PipeEventCommitManager.getInstance() .register( environment.getPipeName(), @@ -83,48 +77,23 @@ public class PipeSinkSubtaskManager { environment.getRegionId(), connectorKey); - final boolean isDataRegionSink = - StorageEngine.getInstance() - .getAllDataRegionIds() - .contains(new DataRegionId(environment.getRegionId())) - || PipeRuntimeMeta.isSourceExternal(environment.getRegionId()); - - final int sinkNum; + final boolean isDataRegionSink = isDataRegionSink(environment.getRegionId()); + final int sinkNum = calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink, connectorKey); boolean realTimeFirst = false; - boolean serializeByRegion = false; - String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + final String attributeSortedString = + generateAttributeSortedString(pipeSinkParameters, environment.getRegionId()); final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters); if (isDataRegionSink) { - serializeByRegion = PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters); - sinkNum = - serializeByRegion - ? 1 - : pipeSinkParameters.getIntOrDefault( - Arrays.asList( - PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, - PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), - PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey) - ? 1 - : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); realTimeFirst = pipeSinkParameters.getBooleanOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, PipeSinkConstant.SINK_REALTIME_FIRST_KEY), PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE); - attributeSortedString = - serializeByRegion - ? "data_region_" + environment.getRegionId() + "_" + attributeSortedString - : "data_" + attributeSortedString; - } else { - // Do not allow parallel tasks for schema region connectors - // to avoid the potential disorder of the schema region data transfer - sinkNum = 1; - attributeSortedString = "schema_" + attributeSortedString; } final String attributeDisplayStringWithPrefix = isDataRegionSink - ? serializeByRegion + ? PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters) ? "data_region_" + environment.getRegionId() + "_" + attributeDisplayString : "data_" + attributeDisplayString : "schema_" + attributeDisplayString; @@ -285,6 +254,59 @@ public class PipeSinkSubtaskManager { .getPendingQueue(); } + public synchronized boolean hasRegisteredSubtasks( + final PipeParameters pipeSinkParameters, final int regionId) { + return attributeSortedString2SubtaskLifeCycleMap.containsKey( + generateAttributeSortedString(pipeSinkParameters, regionId)); + } + + public static int calculateSinkSubtaskNum( + final PipeParameters pipeSinkParameters, final int regionId) { + final String connectorKey = getConnectorKey(pipeSinkParameters); + return calculateSinkSubtaskNum(pipeSinkParameters, isDataRegionSink(regionId), connectorKey); + } + + public static String generateAttributeSortedString( + final PipeParameters pipeSinkParameters, final int regionId) { + final String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + if (isDataRegionSink(regionId)) { + return PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters) + ? "data_region_" + regionId + "_" + attributeSortedString + : "data_" + attributeSortedString; + } + return "schema_" + attributeSortedString; + } + + private static String getConnectorKey(final PipeParameters pipeSinkParameters) { + return PipeSinkConstant.getConnectorOrSinkNameWithDefault(pipeSinkParameters).toLowerCase(); + } + + private static boolean isDataRegionSink(final int regionId) { + return StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) + || PipeRuntimeMeta.isSourceExternal(regionId); + } + + private static int calculateSinkSubtaskNum( + final PipeParameters pipeSinkParameters, + final boolean isDataRegionSink, + final String connectorKey) { + if (!isDataRegionSink) { + // Do not allow parallel tasks for schema region connectors to avoid the potential disorder of + // the schema region data transfer. + return 1; + } + if (PipeSinkConstant.isSerializeByRegionEnabled(pipeSinkParameters)) { + return 1; + } + return pipeSinkParameters.getIntOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, + PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), + PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(connectorKey) + ? 1 + : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); + } + private static String generateAttributeSortedString( final PipeParameters pipeConnectorParameters) { final TreeMap<String, String> sortedStringSourceMap = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index c30a50ac495..48307d3b96f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -508,18 +508,15 @@ public abstract class PipeTaskAgent { final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName(); final long creationTime = pipeMetaFromCoordinator.getStaticMeta().getCreationTime(); - calculateMemoryUsage( - pipeMetaFromCoordinator.getStaticMeta(), - pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(), - pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(), - pipeMetaFromCoordinator.getStaticMeta().getSinkParameters()); - final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); - if (existedPipeMeta != null) { - if (!checkBeforeCreatePipe(existedPipeMeta, pipeName, creationTime)) { - return false; - } + if (existedPipeMeta != null + && !checkBeforeCreatePipe(existedPipeMeta, pipeName, creationTime)) { + return false; + } + calculateMemoryUsage(pipeMetaFromCoordinator); + + if (existedPipeMeta != null) { // Drop the pipe if // 1. The pipe with the same name but with different creation time has been created before // 2. The pipe with the same name and the same creation time has been dropped before, but the @@ -552,6 +549,15 @@ public abstract class PipeTaskAgent { return needToStartPipe; } + protected void calculateMemoryUsage(final PipeMeta pipeMetaFromCoordinator) + throws IllegalPathException { + calculateMemoryUsage( + pipeMetaFromCoordinator.getStaticMeta(), + pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(), + pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(), + pipeMetaFromCoordinator.getStaticMeta().getSinkParameters()); + } + protected void calculateMemoryUsage( final PipeStaticMeta staticMeta, final PipeParameters extractorParameters,
