This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ccfa23e6539 Pipe: Added total floating memory usage for mem-ctrl of 
linked but deleted tsFile resource and insertNode memory (#15205) (#15268)
ccfa23e6539 is described below

commit ccfa23e6539e7be8203d594d9ab4d9b32fb8ceff
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 3 16:46:38 2025 +0800

    Pipe: Added total floating memory usage for mem-ctrl of linked but deleted 
tsFile resource and insertNode memory (#15205) (#15268)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 29 +++++-------
 .../IoTDBDataNodeCacheLeaderClientManager.java     |  4 +-
 .../TsFileInsertionDataContainerProvider.java      |  2 +-
 .../PipeRealtimeDataRegionHybridExtractor.java     |  9 ++--
 .../pipe/metric/overview/PipeResourceMetrics.java  |  2 +-
 .../db/pipe/resource/memory/PipeMemoryManager.java | 51 ++++++++++++++--------
 .../resource/tsfile/PipeTsFileResourceManager.java |  2 +-
 .../broker/SubscriptionPrefetchingQueueStates.java |  2 +-
 .../event/cache/SubscriptionPollResponseCache.java |  4 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  9 ++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 +++
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 13 ++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  5 +++
 13 files changed, 87 insertions(+), 50 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 6714ef5e065..dcafcd9d6b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -51,6 +51,7 @@ import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTim
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
@@ -612,19 +613,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
 
     final long totalLinkedButDeletedTsFileResourceRamSize =
-        
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize();
-    final long freeMemorySizeInBytes =
-        PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
-    if (3 * totalLinkedButDeletedTsFileResourceRamSize >= 2 * 
freeMemorySizeInBytes) {
+        
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsFileResourceRamSize();
+    final long totalInsertNodeFloatingMemoryUsageInBytes = 
getAllFloatingMemoryUsageInByte();
+    final long totalFloatingMemorySizeInBytes =
+        PipeMemoryManager.getTotalFloatingMemorySizeInBytes();
+    if (totalInsertNodeFloatingMemoryUsageInBytes + 
totalLinkedButDeletedTsFileResourceRamSize
+        >= totalFloatingMemorySizeInBytes) {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         stuckPipes.add(pipeMeta);
       }
       if (!stuckPipes.isEmpty()) {
         LOGGER.warn(
-            "All {} pipe(s) will be restarted because linked tsfiles' resource 
size {} exceeds limit {}.",
+            "All {} pipe(s) will be restarted because linked but deleted 
tsFiles' resource size {} and all insertNode's size {} exceeds limit {}.",
             stuckPipes.size(),
             totalLinkedButDeletedTsFileResourceRamSize,
-            freeMemorySizeInBytes * 2.0 / 3);
+            totalInsertNodeFloatingMemoryUsageInBytes,
+            totalFloatingMemorySizeInBytes);
       }
       return stuckPipes;
     }
@@ -668,19 +672,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               mayMemTablePinnedCountReachDangerousThreshold(),
               mayWalSizeReachThrottleThreshold());
           stuckPipes.add(pipeMeta);
-        } else if (getFloatingMemoryUsageInByte(pipeName)
-            >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
-                / pipeMetaKeeper.getPipeMetaCount()) {
-          // Extractors of this pipe may have too many insert nodes
-          LOGGER.warn(
-              "Pipe {} needs to restart because too many insertNodes are 
extracted. "
-                  + "Floating memory usage for this pipe: {}, free memory 
size: {}, allowed free memory size for floating memory usage: {}",
-              pipeMeta.getStaticMeta(),
-              getFloatingMemoryUsageInByte(pipeName),
-              PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
-              PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
-                  / pipeMetaKeeper.getPipeMetaCount());
-          stuckPipes.add(pipeMeta);
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index e45954e7e4d..8b88a1c36ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -51,10 +51,10 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
 
     public LeaderCacheManager() {
       final long initMemorySizeInBytes =
-          PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 
10;
+          
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes() / 
10;
       final long maxMemorySizeInBytes =
           (long)
-              (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+              
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
                   * CONFIG.getPipeLeaderCacheMemoryUsagePercentage());
 
       // properties required by pipe memory control framework
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
index a5af5aec4f1..21ce698141d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -79,7 +79,7 @@ public class TsFileInsertionDataContainerProvider {
 
     // Use scan container to save memory
     if ((double) 
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()
-            / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+            / 
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
         > PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
       return new TsFileInsertionScanDataContainer(
           pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index cecdf23642d..ffee6ac9499 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -329,17 +330,17 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
     final long floatingMemoryUsageInByte =
         PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
     final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
-    final long freeMemorySizeInBytes =
-        PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+    final long totalFloatingMemorySizeInBytes =
+        PipeMemoryManager.getTotalFloatingMemorySizeInBytes();
     final boolean mayInsertNodeMemoryReachDangerousThreshold =
-        3 * floatingMemoryUsageInByte * pipeCount >= 2 * freeMemorySizeInBytes;
+        3 * floatingMemoryUsageInByte * pipeCount >= 2 * 
totalFloatingMemorySizeInBytes;
     if (mayInsertNodeMemoryReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
       LOGGER.info(
           "Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage 
of the insert node {} has reached the dangerous threshold {}",
           pipeName,
           dataRegionId,
           floatingMemoryUsageInByte * pipeCount,
-          2 * freeMemorySizeInBytes / 3.0d);
+          2 * totalFloatingMemorySizeInBytes / 3.0d);
     }
     return mayInsertNodeMemoryReachDangerousThreshold;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 7b72ea70a71..c9115575000 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -71,7 +71,7 @@ public class PipeResourceMetrics implements IMetricSet {
         Metric.PIPE_MEM.toString(),
         MetricLevel.IMPORTANT,
         PipeDataNodeResourceManager.memory(),
-        PipeMemoryManager::getTotalMemorySizeInBytes,
+        o -> PipeMemoryManager.getTotalNonFloatingMemorySizeInBytes(),
         Tag.NAME.toString(),
         PIPE_TOTAL_MEMORY);
     // resource reference count
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index c2ac5cb70c4..e8a5b457df7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -66,6 +66,9 @@ public class PipeMemoryManager {
       
PipeConfig.getInstance().getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
   private volatile long usedMemorySizeInBytesOfTsFiles;
 
+  private static final double FLOATING_MEMORY_RATIO =
+      PipeConfig.getInstance().getPipeTotalFloatingMemoryProportion();
+
   // Only non-zero memory blocks will be added to this set.
   private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
 
@@ -89,17 +92,17 @@ public class PipeMemoryManager {
 
   private static double allowedMaxMemorySizeInBytesOfTabletsAndTsFiles() {
     return (TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD)
-        * TOTAL_MEMORY_SIZE_IN_BYTES;
+        * getTotalNonFloatingMemorySizeInBytes();
   }
 
   private static double allowedMaxMemorySizeInBytesOfTablets() {
     return (TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD / 
2)
-        * TOTAL_MEMORY_SIZE_IN_BYTES;
+        * getTotalNonFloatingMemorySizeInBytes();
   }
 
   private static double allowedMaxMemorySizeInBytesOfTsTiles() {
     return (TS_FILE_MEMORY_REJECT_THRESHOLD + TABLET_MEMORY_REJECT_THRESHOLD / 
2)
-        * TOTAL_MEMORY_SIZE_IN_BYTES;
+        * getTotalNonFloatingMemorySizeInBytes();
   }
 
   public boolean isEnough4TabletParsing() {
@@ -171,7 +174,9 @@ public class PipeMemoryManager {
           String.format(
               "forceAllocateForTablet: failed to allocate because there's too 
much memory for tablets, "
                   + "total memory size %d bytes, used memory for tablet size 
%d bytes, requested memory size %d bytes",
-              TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTablets, 
tabletSizeInBytes));
+              getTotalNonFloatingMemorySizeInBytes(),
+              usedMemorySizeInBytesOfTablets,
+              tabletSizeInBytes));
     }
 
     synchronized (this) {
@@ -211,7 +216,9 @@ public class PipeMemoryManager {
           String.format(
               "forceAllocateForTsFile: failed to allocate because there's too 
much memory for tsfiles, "
                   + "total memory size %d bytes, used memory for tsfile size 
%d bytes, requested memory size %d bytes",
-              TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTsFiles, 
tsFileSizeInBytes));
+              getTotalNonFloatingMemorySizeInBytes(),
+              usedMemorySizeInBytesOfTsFiles,
+              tsFileSizeInBytes));
     }
 
     synchronized (this) {
@@ -237,7 +244,7 @@ public class PipeMemoryManager {
     }
 
     for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
-      if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
+      if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >= 
sizeInBytes) {
         return registerMemoryBlock(sizeInBytes, type);
       }
 
@@ -256,7 +263,7 @@ public class PipeMemoryManager {
                 + "total memory size %d bytes, used memory size %d bytes, "
                 + "requested memory size %d bytes",
             MEMORY_ALLOCATE_MAX_RETRIES,
-            TOTAL_MEMORY_SIZE_IN_BYTES,
+            getTotalNonFloatingMemorySizeInBytes(),
             usedMemorySizeInBytes,
             sizeInBytes));
   }
@@ -299,7 +306,7 @@ public class PipeMemoryManager {
 
     long sizeInBytes = targetSize - oldSize;
     for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
-      if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
+      if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >= 
sizeInBytes) {
         usedMemorySizeInBytes += sizeInBytes;
         if (block instanceof PipeTabletMemoryBlock) {
           usedMemorySizeInBytesOfTablets += sizeInBytes;
@@ -326,7 +333,7 @@ public class PipeMemoryManager {
                 + "total memory size %d bytes, used memory size %d bytes, "
                 + "requested memory size %d bytes",
             MEMORY_ALLOCATE_MAX_RETRIES,
-            TOTAL_MEMORY_SIZE_IN_BYTES,
+            getTotalNonFloatingMemorySizeInBytes(),
             usedMemorySizeInBytes,
             sizeInBytes));
   }
@@ -355,7 +362,7 @@ public class PipeMemoryManager {
     }
 
     if ((float) (usedMemorySizeInBytes + sizeInBytes)
-        <= TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold) {
+        <= getTotalNonFloatingMemorySizeInBytes() * usedThreshold) {
       return forceAllocate(sizeInBytes);
     }
 
@@ -372,19 +379,20 @@ public class PipeMemoryManager {
       return new PipeMemoryBlock(sizeInBytes);
     }
 
-    if (sizeInBytes == 0 || TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes 
>= sizeInBytes) {
+    if (sizeInBytes == 0
+        || getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >= 
sizeInBytes) {
       return registerMemoryBlock(sizeInBytes);
     }
 
     long sizeToAllocateInBytes = sizeInBytes;
     while (sizeToAllocateInBytes > MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES) {
-      if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= 
sizeToAllocateInBytes) {
+      if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes >= 
sizeToAllocateInBytes) {
         LOGGER.info(
             "tryAllocate: allocated memory, "
                 + "total memory size {} bytes, used memory size {} bytes, "
                 + "original requested memory size {} bytes, "
                 + "actual requested memory size {} bytes",
-            TOTAL_MEMORY_SIZE_IN_BYTES,
+            getTotalNonFloatingMemorySizeInBytes(),
             usedMemorySizeInBytes,
             sizeInBytes,
             sizeToAllocateInBytes);
@@ -403,7 +411,7 @@ public class PipeMemoryManager {
               + "total memory size {} bytes, used memory size {} bytes, "
               + "original requested memory size {} bytes, "
               + "actual requested memory size {} bytes",
-          TOTAL_MEMORY_SIZE_IN_BYTES,
+          getTotalNonFloatingMemorySizeInBytes(),
           usedMemorySizeInBytes,
           sizeInBytes,
           sizeToAllocateInBytes);
@@ -413,7 +421,7 @@ public class PipeMemoryManager {
           "tryAllocate: failed to allocate memory, "
               + "total memory size {} bytes, used memory size {} bytes, "
               + "requested memory size {} bytes",
-          TOTAL_MEMORY_SIZE_IN_BYTES,
+          getTotalNonFloatingMemorySizeInBytes(),
           usedMemorySizeInBytes,
           sizeInBytes);
       return registerMemoryBlock(0);
@@ -426,7 +434,8 @@ public class PipeMemoryManager {
       return false;
     }
 
-    if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= 
memoryInBytesNeededToBeAllocated) {
+    if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes
+        >= memoryInBytesNeededToBeAllocated) {
       usedMemorySizeInBytes += memoryInBytesNeededToBeAllocated;
       if (block instanceof PipeTabletMemoryBlock) {
         usedMemorySizeInBytesOfTablets += memoryInBytesNeededToBeAllocated;
@@ -479,7 +488,7 @@ public class PipeMemoryManager {
       for (final PipeMemoryBlock block : shuffledBlocks) {
         if (block.shrink()) {
           hasAtLeastOneBlockShrinkable = true;
-          if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= 
sizeInBytes) {
+          if (getTotalNonFloatingMemorySizeInBytes() - usedMemorySizeInBytes 
>= sizeInBytes) {
             return true;
           }
         }
@@ -583,7 +592,11 @@ public class PipeMemoryManager {
     return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
   }
 
-  public long getTotalMemorySizeInBytes() {
-    return TOTAL_MEMORY_SIZE_IN_BYTES;
+  public static long getTotalNonFloatingMemorySizeInBytes() {
+    return (long) (TOTAL_MEMORY_SIZE_IN_BYTES * (1 - FLOATING_MEMORY_RATIO));
+  }
+
+  public static long getTotalFloatingMemorySizeInBytes() {
+    return (long) (TOTAL_MEMORY_SIZE_IN_BYTES * FLOATING_MEMORY_RATIO);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index b90ed59b4a4..19de7a107be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -376,7 +376,7 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public long getTotalLinkedButDeletedTsfileResourceRamSize() {
+  public long getTotalLinkedButDeletedTsFileResourceRamSize() {
     long totalLinkedButDeletedTsfileResourceRamSize = 0;
     try {
       for (final Map.Entry<String, PipeTsFileResource> resourceEntry :
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
index 8b0eb256f42..70eacd2eaaa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java
@@ -133,7 +133,7 @@ public class SubscriptionPrefetchingQueueStates {
   }
 
   private boolean isMemoryEnough() {
-    return PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+    return 
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
             * PREFETCH_MEMORY_THRESHOLD
         > PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 4a49042b9ee..55848c47c01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -105,10 +105,10 @@ public class SubscriptionPollResponseCache {
 
   private SubscriptionPollResponseCache() {
     final long initMemorySizeInBytes =
-        PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 5;
+        
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes() / 5;
     final long maxMemorySizeInBytes =
         (long)
-            (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+            
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
                 * 
SubscriptionConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
 
     // properties required by pipe memory control framework
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6a78fce213a..10e1954f39c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -217,6 +217,7 @@ public class CommonConfig {
   private int pipeDataStructureTabletSizeInBytes = 2097152;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.4;
   private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.4;
+  private double pipeTotalFloatingMemoryProportion = 0.2;
 
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
@@ -783,6 +784,14 @@ public class CommonConfig {
         pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold;
   }
 
+  public double getPipeTotalFloatingMemoryProportion() {
+    return pipeTotalFloatingMemoryProportion;
+  }
+
+  public void setPipeTotalFloatingMemoryProportion(double 
pipeTotalFloatingMemoryProportion) {
+    this.pipeTotalFloatingMemoryProportion = pipeTotalFloatingMemoryProportion;
+  }
+
   public int getPipeExtractorAssignerDisruptorRingBufferSize() {
     return pipeExtractorAssignerDisruptorRingBufferSize;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 4634b61e74d..b31d619ae29 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -299,6 +299,11 @@ public class CommonDescriptor {
                 
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
                 String.valueOf(
                     
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
+    config.setPipeTotalFloatingMemoryProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_total_floating_memory_proportion",
+                
String.valueOf(config.getPipeTotalFloatingMemoryProportion()))));
 
     config.setPipeRealTimeQueuePollTsFileThreshold(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 41cf6df907d..64ea06b1c61 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -52,6 +52,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -1069,6 +1070,18 @@ public abstract class PipeTaskAgent {
             .getCommitterKey(pipeName, creationTime, regionId, restartTime);
   }
 
+  public long getAllFloatingMemoryUsageInByte() {
+    final AtomicLong bytes = new AtomicLong(0);
+    pipeMetaKeeper
+        .getPipeMetaList()
+        .forEach(
+            pipeMeta ->
+                bytes.addAndGet(
+                    ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+                        .getFloatingMemoryUsageInByte()));
+    return bytes.get();
+  }
+
   public long getFloatingMemoryUsageInByte(final String pipeName) {
     final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     return pipeMeta == null
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 677ca016eee..590d9f864c0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -76,6 +76,10 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
   }
 
+  public double getPipeTotalFloatingMemoryProportion() {
+    return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
+  }
+
   /////////////////////////////// Subtask Connector 
///////////////////////////////
 
   public int getPipeRealTimeQueuePollTsFileThreshold() {
@@ -399,6 +403,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeDataStructureTsFileMemoryBlockAllocationRejectThreshold: {}",
         getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
+    LOGGER.info("PipeTotalFloatingMemoryProportion: {}", 
getPipeTotalFloatingMemoryProportion());
 
     LOGGER.info(
         "PipeRealTimeQueuePollTsFileThreshold: {}", 
getPipeRealTimeQueuePollTsFileThreshold());

Reply via email to