This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 4f70660d067 Pipe: Introduce restart strategy to control resources' 
memory only used by pipe hardlinked files & Pipe: fix too many warn logs from 
findAllStuckPipes() (#14279) (#14287) (#14297)
4f70660d067 is described below

commit 4f70660d067fe8f0bfddd502ebce1161fe57b222
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Dec 4 10:33:14 2024 +0800

    Pipe: Introduce restart strategy to control resources' memory only used by 
pipe hardlinked files & Pipe: fix too many warn logs from findAllStuckPipes() 
(#14279) (#14287) (#14297)
    
    * Pipe: Introduce restart strategy to control resources' memory only used 
by pipe hardlinked files (#14279)
    
    (cherry picked from commit ba2646059ea8c6c7fa5fc561bbba00e066fb9b75)
    
    * Pipe: fix too many warn logs from findAllStuckPipes() (#14287)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 23 +++++++++++++++++++---
 .../PipeRealtimeDataRegionHybridExtractor.java     |  4 +---
 .../db/pipe/resource/memory/PipeMemoryManager.java |  4 ++++
 .../pipe/resource/tsfile/PipeTsFileResource.java   |  4 ++++
 .../resource/tsfile/PipeTsFileResourceManager.java | 19 ++++++++++++++++++
 5 files changed, 48 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 4686e302eb6..d730e22694e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -499,6 +499,24 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         stuckPipes.add(pipeMeta);
       }
+      if (!stuckPipes.isEmpty()) {
+        LOGGER.warn(
+            "All {} pipe(s) will be restarted because of forced restart 
policy.",
+            stuckPipes.size());
+      }
+      return stuckPipes;
+    }
+
+    if (3 * 
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize()
+        >= 2 * 
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) {
+      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+        stuckPipes.add(pipeMeta);
+      }
+      if (!stuckPipes.isEmpty()) {
+        LOGGER.warn(
+            "All {} pipe(s) will be restarted because linked tsfiles' resource 
size exceeds memory limit.",
+            stuckPipes.size());
+      }
       return stuckPipes;
     }
 
@@ -529,7 +547,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
         continue;
       }
 
-      // Only restart the stream mode pipes for releasing memTables.
+      // Try to restart the stream mode pipes for releasing memTables.
       if (extractors.get(0).isStreamMode()) {
         if 
(extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
             && (mayMemTablePinnedCountReachDangerousThreshold()
@@ -540,8 +558,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               pipeMeta.getStaticMeta());
           stuckPipes.add(pipeMeta);
         } else if (getFloatingMemoryUsageInByte(pipeName)
-            >= 
(PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
-                    - 
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
+            >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
                 / pipeMetaKeeper.getPipeMetaCount()) {
           // Extractors of this pipe may have too many insert nodes
           LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 4db9249d73c..207a3f8766e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -250,9 +250,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     return 3
             * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
             * PipeDataNodeAgent.task().getPipeCount()
-        >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
-                - 
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
-            * 2;
+        >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 8ae6235099c..2bda0e32a10 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -499,6 +499,10 @@ public class PipeMemoryManager {
     return usedMemorySizeInBytes;
   }
 
+  public long getFreeMemorySizeInBytes() {
+    return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
+  }
+
   public long getTotalMemorySizeInBytes() {
     return TOTAL_MEMORY_SIZE_IN_BYTES;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 7bb67e781f2..1c2a46e9b59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -97,6 +97,10 @@ public class PipeTsFileResource implements AutoCloseable {
     return fileSize;
   }
 
+  public long getTsFileResourceSize() {
+    return Objects.nonNull(tsFileResource) ? tsFileResource.calculateRamSize() 
: 0;
+  }
+
   ///////////////////// Reference Count /////////////////////
 
   public int getReferenceCount() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 01bf9f8ca0e..d6e4fe29da1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -357,4 +357,23 @@ public class PipeTsFileResourceManager {
       return 0;
     }
   }
+
+  public long getTotalLinkedButDeletedTsfileResourceRamSize() {
+    long totalLinkedButDeletedTsfileResourceRamSize = 0;
+    try {
+      for (final Map.Entry<String, PipeTsFileResource> resourceEntry :
+          hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet()) {
+        final PipeTsFileResource pipeTsFileResource = resourceEntry.getValue();
+        // If the original TsFile is not deleted, the memory of the resource 
is not counted
+        // because the memory of the resource is controlled by 
TsFileResourceManager.
+        if (pipeTsFileResource.isOriginalTsFileDeleted()) {
+          totalLinkedButDeletedTsfileResourceRamSize += 
pipeTsFileResource.getTsFileResourceSize();
+        }
+      }
+      return totalLinkedButDeletedTsfileResourceRamSize;
+    } catch (final Exception e) {
+      LOGGER.warn("failed to get total size of linked but deleted TsFiles 
resource ram size: ", e);
+      return totalLinkedButDeletedTsfileResourceRamSize;
+    }
+  }
 }

Reply via email to