This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ccfa23e6539 Pipe: Added total floating memory usage for mem-ctrl of
linked but deleted tsFile resource and insertNode memory (#15205) (#15268)
ccfa23e6539 is described below
commit ccfa23e6539e7be8203d594d9ab4d9b32fb8ceff
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 3 16:46:38 2025 +0800
Pipe: Added total floating memory usage for mem-ctrl of linked but deleted
tsFile resource and insertNode memory (#15205) (#15268)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 29 +++++-------
.../IoTDBDataNodeCacheLeaderClientManager.java | 4 +-
.../TsFileInsertionDataContainerProvider.java | 2 +-
.../PipeRealtimeDataRegionHybridExtractor.java | 9 ++--
.../pipe/metric/overview/PipeResourceMetrics.java | 2 +-
.../db/pipe/resource/memory/PipeMemoryManager.java | 51 ++++++++++++++--------
.../resource/tsfile/PipeTsFileResourceManager.java | 2 +-
.../broker/SubscriptionPrefetchingQueueStates.java | 2 +-
.../event/cache/SubscriptionPollResponseCache.java | 4 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 9 ++++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +++
.../commons/pipe/agent/task/PipeTaskAgent.java | 13 ++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +++
13 files changed, 87 insertions(+), 50 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 6714ef5e065..dcafcd9d6b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTim
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
@@ -612,19 +613,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
final long totalLinkedButDeletedTsFileResourceRamSize =
-
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize();
- final long freeMemorySizeInBytes =
- PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
- if (3 * totalLinkedButDeletedTsFileResourceRamSize >= 2 *
freeMemorySizeInBytes) {
+
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsFileResourceRamSize();
+ final long totalInsertNodeFloatingMemoryUsageInBytes =
getAllFloatingMemoryUsageInByte();
+ final long totalFloatingMemorySizeInBytes =
+ PipeMemoryManager.getTotalFloatingMemorySizeInBytes();
+ if (totalInsertNodeFloatingMemoryUsageInBytes +
totalLinkedButDeletedTsFileResourceRamSize
+ >= totalFloatingMemorySizeInBytes) {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
stuckPipes.add(pipeMeta);
}
if (!stuckPipes.isEmpty()) {
LOGGER.warn(
- "All {} pipe(s) will be restarted because linked tsfiles' resource
size {} exceeds limit {}.",
+ "All {} pipe(s) will be restarted because linked but deleted
tsFiles' resource size {} and all insertNode's size {} exceeds limit {}.",
stuckPipes.size(),
totalLinkedButDeletedTsFileResourceRamSize,
- freeMemorySizeInBytes * 2.0 / 3);
+ totalInsertNodeFloatingMemoryUsageInBytes,
+ totalFloatingMemorySizeInBytes);
}
return stuckPipes;
}
@@ -668,19 +672,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
mayMemTablePinnedCountReachDangerousThreshold(),
mayWalSizeReachThrottleThreshold());
stuckPipes.add(pipeMeta);
- } else if (getFloatingMemoryUsageInByte(pipeName)
- >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
- / pipeMetaKeeper.getPipeMetaCount()) {
- // Extractors of this pipe may have too many insert nodes
- LOGGER.warn(
- "Pipe {} needs to restart because too many insertNodes are
extracted. "
- + "Floating memory usage for this pipe: {}, free memory
size: {}, allowed free memory size for floating memory usage: {}",
- pipeMeta.getStaticMeta(),
- getFloatingMemoryUsageInByte(pipeName),
- PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
- PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
- / pipeMetaKeeper.getPipeMetaCount());
- stuckPipes.add(pipeMeta);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index e45954e7e4d..8b88a1c36ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -51,10 +51,10 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
public LeaderCacheManager() {
final long initMemorySizeInBytes =
- PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() /
10;
+
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes() /
10;
final long maxMemorySizeInBytes =
(long)
- (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
* CONFIG.getPipeLeaderCacheMemoryUsagePercentage());
// properties required by pipe memory control framework
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
index a5af5aec4f1..21ce698141d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -79,7 +79,7 @@ public class TsFileInsertionDataContainerProvider {
// Use scan container to save memory
if ((double)
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()
- / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+ /
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
> PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
return new TsFileInsertionScanDataContainer(
pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index cecdf23642d..ffee6ac9499 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.pipe.api.event.Event;
@@ -329,17 +330,17 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
final long floatingMemoryUsageInByte =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
- final long freeMemorySizeInBytes =
- PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+ final long totalFloatingMemorySizeInBytes =
+ PipeMemoryManager.getTotalFloatingMemorySizeInBytes();
final boolean mayInsertNodeMemoryReachDangerousThreshold =
- 3 * floatingMemoryUsageInByte * pipeCount >= 2 * freeMemorySizeInBytes;
+ 3 * floatingMemoryUsageInByte * pipeCount >= 2 *
totalFloatingMemorySizeInBytes;
if (mayInsertNodeMemoryReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
LOGGER.info(
"Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage
of the insert node {} has reached the dangerous threshold {}",
pipeName,
dataRegionId,
floatingMemoryUsageInByte * pipeCount,
- 2 * freeMemorySizeInBytes / 3.0d);
+ 2 * totalFloatingMemorySizeInBytes / 3.0d);
}
return mayInsertNodeMemoryReachDangerousThreshold;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 7b72ea70a71..c9115575000 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -71,7 +71,7 @@ public class PipeResourceMetrics implements IMetricSet {
Metric.PIPE_MEM.toString(),
MetricLevel.IMPORTANT,
PipeDataNodeResourceManager.memory(),
- PipeMemoryManager::getTotalMemorySizeInBytes,
+ o -> PipeMemoryManager.getTotalNonFloatingMemorySizeInBytes(),
Tag.NAME.toString(),
PIPE_TOTAL_MEMORY);
// resource reference count
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index c2ac5cb70c4..e8a5b457df7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -66,6 +66,9 @@ public class PipeMemoryManager {
PipeConfig.getInstance().getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
private volatile long usedMemorySizeInBytesOfTsFiles;
+ private static final double FLOATING_MEMORY_RATIO =
+ PipeConfig.getInstance().getPipeTotalFloatingMemoryProportion();
+
// Only non-zero memory blocks will be added to this set.
private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
@@ -89,17 +92,17 @@ public class PipeMemoryManager {
private static double allowedMaxMemorySizeInBytesOfTabletsAndTsFiles() {
return (TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD)
- * TOTAL_MEMORY_SIZE_IN_BYTES;
+ * getTotalNonFloatingMemorySizeInBytes();
}
private static double allowedMaxMemorySizeInBytesOfTablets() {
return (TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD /
2)
- * TOTAL_MEMORY_SIZE_IN_BYTES;
+ * getTotalNonFloatingMemorySizeInBytes();
}
private static double allowedMaxMemorySizeInBytesOfTsTiles() {
return (TS_FILE_MEMORY_REJECT_THRESHOLD + TABLET_MEMORY_REJECT_THRESHOLD /
2)
- * TOTAL_MEMORY_SIZE_IN_BYTES;
+ * getTotalNonFloatingMemorySizeInBytes();
}
public boolean isEnough4TabletParsing() {
@@ -171,7 +174,9 @@ public class PipeMemoryManager {
String.format(
"forceAllocateForTablet: failed to allocate because there's too
much memory for tablets, "
+ "total memory size %d bytes, used memory for tablet size
%d bytes, requested memory size %d bytes",
- TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTablets,
tabletSizeInBytes));
+ getTotalNonFloatingMemorySizeInBytes(),
+ usedMemorySizeInBytesOfTablets,
+ tabletSizeInBytes));
}
synchronized (this) {
@@ -211,7 +216,9 @@ public class PipeMemoryManager {
String.format(
"forceAllocateForTsFile: failed to allocate because there's too
much memory for tsfiles, "
+ "total memory size %d bytes, used memory for tsfile size
%d bytes, requested memory size %d bytes",
- TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTsFiles,
tsFileSizeInBytes));
+ getTotalNonFloatingMemorySizeInBytes(),
+ usedMemorySizeInBytesOfTsFiles,
+ tsFileSizeInBytes));
}
synchronized (this) {
@@ -237,7 +244,7 @@ public class PipeMemoryManager {
}
for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
- if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
+ if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >=
sizeInBytes) {
return registerMemoryBlock(sizeInBytes, type);
}
@@ -256,7 +263,7 @@ public class PipeMemoryManager {
+ "total memory size %d bytes, used memory size %d bytes, "
+ "requested memory size %d bytes",
MEMORY_ALLOCATE_MAX_RETRIES,
- TOTAL_MEMORY_SIZE_IN_BYTES,
+ getTotalNonFloatingMemorySizeInBytes(),
usedMemorySizeInBytes,
sizeInBytes));
}
@@ -299,7 +306,7 @@ public class PipeMemoryManager {
long sizeInBytes = targetSize - oldSize;
for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
- if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
+ if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >=
sizeInBytes) {
usedMemorySizeInBytes += sizeInBytes;
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets += sizeInBytes;
@@ -326,7 +333,7 @@ public class PipeMemoryManager {
+ "total memory size %d bytes, used memory size %d bytes, "
+ "requested memory size %d bytes",
MEMORY_ALLOCATE_MAX_RETRIES,
- TOTAL_MEMORY_SIZE_IN_BYTES,
+ getTotalNonFloatingMemorySizeInBytes(),
usedMemorySizeInBytes,
sizeInBytes));
}
@@ -355,7 +362,7 @@ public class PipeMemoryManager {
}
if ((float) (usedMemorySizeInBytes + sizeInBytes)
- <= TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold) {
+ <= getTotalNonFloatingMemorySizeInBytes() * usedThreshold) {
return forceAllocate(sizeInBytes);
}
@@ -372,19 +379,20 @@ public class PipeMemoryManager {
return new PipeMemoryBlock(sizeInBytes);
}
- if (sizeInBytes == 0 || TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes
>= sizeInBytes) {
+ if (sizeInBytes == 0
+ || getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >=
sizeInBytes) {
return registerMemoryBlock(sizeInBytes);
}
long sizeToAllocateInBytes = sizeInBytes;
while (sizeToAllocateInBytes > MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES) {
- if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >=
sizeToAllocateInBytes) {
+ if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >=
sizeToAllocateInBytes) {
LOGGER.info(
"tryAllocate: allocated memory, "
+ "total memory size {} bytes, used memory size {} bytes, "
+ "original requested memory size {} bytes, "
+ "actual requested memory size {} bytes",
- TOTAL_MEMORY_SIZE_IN_BYTES,
+ getTotalNonFloatingMemorySizeInBytes(),
usedMemorySizeInBytes,
sizeInBytes,
sizeToAllocateInBytes);
@@ -403,7 +411,7 @@ public class PipeMemoryManager {
+ "total memory size {} bytes, used memory size {} bytes, "
+ "original requested memory size {} bytes, "
+ "actual requested memory size {} bytes",
- TOTAL_MEMORY_SIZE_IN_BYTES,
+ getTotalNonFloatingMemorySizeInBytes(),
usedMemorySizeInBytes,
sizeInBytes,
sizeToAllocateInBytes);
@@ -413,7 +421,7 @@ public class PipeMemoryManager {
"tryAllocate: failed to allocate memory, "
+ "total memory size {} bytes, used memory size {} bytes, "
+ "requested memory size {} bytes",
- TOTAL_MEMORY_SIZE_IN_BYTES,
+ getTotalNonFloatingMemorySizeInBytes(),
usedMemorySizeInBytes,
sizeInBytes);
return registerMemoryBlock(0);
@@ -426,7 +434,8 @@ public class PipeMemoryManager {
return false;
}
- if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >=
memoryInBytesNeededToBeAllocated) {
+ if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes
+ >= memoryInBytesNeededToBeAllocated) {
usedMemorySizeInBytes += memoryInBytesNeededToBeAllocated;
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets += memoryInBytesNeededToBeAllocated;
@@ -479,7 +488,7 @@ public class PipeMemoryManager {
for (final PipeMemoryBlock block : shuffledBlocks) {
if (block.shrink()) {
hasAtLeastOneBlockShrinkable = true;
- if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >=
sizeInBytes) {
+ if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes
>= sizeInBytes) {
return true;
}
}
@@ -583,7 +592,11 @@ public class PipeMemoryManager {
return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
}
- public long getTotalMemorySizeInBytes() {
- return TOTAL_MEMORY_SIZE_IN_BYTES;
+ public static long getTotalNonFloatingMemorySizeInBytes() {
+ return (long) (TOTAL_MEMORY_SIZE_IN_BYTES * (1 - FLOATING_MEMORY_RATIO));
+ }
+
+ public static long getTotalFloatingMemorySizeInBytes() {
+ return (long) (TOTAL_MEMORY_SIZE_IN_BYTES * FLOATING_MEMORY_RATIO);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index b90ed59b4a4..19de7a107be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -376,7 +376,7 @@ public class PipeTsFileResourceManager {
}
}
- public long getTotalLinkedButDeletedTsfileResourceRamSize() {
+ public long getTotalLinkedButDeletedTsFileResourceRamSize() {
long totalLinkedButDeletedTsfileResourceRamSize = 0;
try {
for (final Map.Entry<String, PipeTsFileResource> resourceEntry :
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
index 8b0eb256f42..70eacd2eaaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
@@ -133,7 +133,7 @@ public class SubscriptionPrefetchingQueueStates {
}
private boolean isMemoryEnough() {
- return PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+ return
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
* PREFETCH_MEMORY_THRESHOLD
> PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 4a49042b9ee..55848c47c01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -105,10 +105,10 @@ public class SubscriptionPollResponseCache {
private SubscriptionPollResponseCache() {
final long initMemorySizeInBytes =
- PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 5;
+
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes() / 5;
final long maxMemorySizeInBytes =
(long)
- (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
*
SubscriptionConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
// properties required by pipe memory control framework
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6a78fce213a..10e1954f39c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -217,6 +217,7 @@ public class CommonConfig {
private int pipeDataStructureTabletSizeInBytes = 2097152;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.4;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.4;
+ private double pipeTotalFloatingMemoryProportion = 0.2;
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
@@ -783,6 +784,14 @@ public class CommonConfig {
pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold;
}
+ public double getPipeTotalFloatingMemoryProportion() {
+ return pipeTotalFloatingMemoryProportion;
+ }
+
+ public void setPipeTotalFloatingMemoryProportion(double
pipeTotalFloatingMemoryProportion) {
+ this.pipeTotalFloatingMemoryProportion = pipeTotalFloatingMemoryProportion;
+ }
+
public int getPipeExtractorAssignerDisruptorRingBufferSize() {
return pipeExtractorAssignerDisruptorRingBufferSize;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 4634b61e74d..b31d619ae29 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -299,6 +299,11 @@ public class CommonDescriptor {
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
String.valueOf(
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
+ config.setPipeTotalFloatingMemoryProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_total_floating_memory_proportion",
+
String.valueOf(config.getPipeTotalFloatingMemoryProportion()))));
config.setPipeRealTimeQueuePollTsFileThreshold(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 41cf6df907d..64ea06b1c61 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -52,6 +52,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -1069,6 +1070,18 @@ public abstract class PipeTaskAgent {
.getCommitterKey(pipeName, creationTime, regionId, restartTime);
}
+ public long getAllFloatingMemoryUsageInByte() {
+ final AtomicLong bytes = new AtomicLong(0);
+ pipeMetaKeeper
+ .getPipeMetaList()
+ .forEach(
+ pipeMeta ->
+ bytes.addAndGet(
+ ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+ .getFloatingMemoryUsageInByte()));
+ return bytes.get();
+ }
+
public long getFloatingMemoryUsageInByte(final String pipeName) {
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
return pipeMeta == null
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 677ca016eee..590d9f864c0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -76,6 +76,10 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
}
+ public double getPipeTotalFloatingMemoryProportion() {
+ return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
+ }
+
/////////////////////////////// Subtask Connector
///////////////////////////////
public int getPipeRealTimeQueuePollTsFileThreshold() {
@@ -399,6 +403,7 @@ public class PipeConfig {
LOGGER.info(
"PipeDataStructureTsFileMemoryBlockAllocationRejectThreshold: {}",
getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
+ LOGGER.info("PipeTotalFloatingMemoryProportion: {}",
getPipeTotalFloatingMemoryProportion());
LOGGER.info(
"PipeRealTimeQueuePollTsFileThreshold: {}",
getPipeRealTimeQueuePollTsFileThreshold());