This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 4f70660d067 Pipe: Introduce restart strategy to control resources'
memory only used by pipe hardlinked files & Pipe: fix too many warn logs from
findAllStuckPipes() (#14279) (#14287) (#14297)
4f70660d067 is described below
commit 4f70660d067fe8f0bfddd502ebce1161fe57b222
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Dec 4 10:33:14 2024 +0800
Pipe: Introduce restart strategy to control resources' memory only used by
pipe hardlinked files & Pipe: fix too many warn logs from findAllStuckPipes()
(#14279) (#14287) (#14297)
* Pipe: Introduce restart strategy to control resources' memory only used
by pipe hardlinked files (#14279)
(cherry picked from commit ba2646059ea8c6c7fa5fc561bbba00e066fb9b75)
* Pipe: fix too many warn logs from findAllStuckPipes() (#14287)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 23 +++++++++++++++++++---
.../PipeRealtimeDataRegionHybridExtractor.java | 4 +---
.../db/pipe/resource/memory/PipeMemoryManager.java | 4 ++++
.../pipe/resource/tsfile/PipeTsFileResource.java | 4 ++++
.../resource/tsfile/PipeTsFileResourceManager.java | 19 ++++++++++++++++++
5 files changed, 48 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 4686e302eb6..d730e22694e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -499,6 +499,24 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
stuckPipes.add(pipeMeta);
}
+ if (!stuckPipes.isEmpty()) {
+ LOGGER.warn(
+ "All {} pipe(s) will be restarted because of forced restart
policy.",
+ stuckPipes.size());
+ }
+ return stuckPipes;
+ }
+
+ if (3 *
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize()
+ >= 2 *
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) {
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ stuckPipes.add(pipeMeta);
+ }
+ if (!stuckPipes.isEmpty()) {
+ LOGGER.warn(
+ "All {} pipe(s) will be restarted because linked tsfiles' resource
size exceeds memory limit.",
+ stuckPipes.size());
+ }
return stuckPipes;
}
@@ -529,7 +547,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
continue;
}
- // Only restart the stream mode pipes for releasing memTables.
+ // Try to restart the stream mode pipes for releasing memTables.
if (extractors.get(0).isStreamMode()) {
if
(extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
&& (mayMemTablePinnedCountReachDangerousThreshold()
@@ -540,8 +558,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
} else if (getFloatingMemoryUsageInByte(pipeName)
- >=
(PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
- -
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
+ >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
/ pipeMetaKeeper.getPipeMetaCount()) {
// Extractors of this pipe may have too many insert nodes
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 4db9249d73c..207a3f8766e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -250,9 +250,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
return 3
* PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
* PipeDataNodeAgent.task().getPipeCount()
- >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
- -
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
- * 2;
+ >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 8ae6235099c..2bda0e32a10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -499,6 +499,10 @@ public class PipeMemoryManager {
return usedMemorySizeInBytes;
}
+ public long getFreeMemorySizeInBytes() {
+ return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
+ }
+
public long getTotalMemorySizeInBytes() {
return TOTAL_MEMORY_SIZE_IN_BYTES;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 7bb67e781f2..1c2a46e9b59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -97,6 +97,10 @@ public class PipeTsFileResource implements AutoCloseable {
return fileSize;
}
+ public long getTsFileResourceSize() {
+ return Objects.nonNull(tsFileResource) ? tsFileResource.calculateRamSize()
: 0;
+ }
+
///////////////////// Reference Count /////////////////////
public int getReferenceCount() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 01bf9f8ca0e..d6e4fe29da1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -357,4 +357,23 @@ public class PipeTsFileResourceManager {
return 0;
}
}
+
+ public long getTotalLinkedButDeletedTsfileResourceRamSize() {
+ long totalLinkedButDeletedTsfileResourceRamSize = 0;
+ try {
+ for (final Map.Entry<String, PipeTsFileResource> resourceEntry :
+ hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet()) {
+ final PipeTsFileResource pipeTsFileResource = resourceEntry.getValue();
+ // If the original TsFile is not deleted, the memory of the resource
is not counted
+ // because the memory of the resource is controlled by
TsFileResourceManager.
+ if (pipeTsFileResource.isOriginalTsFileDeleted()) {
+ totalLinkedButDeletedTsfileResourceRamSize +=
pipeTsFileResource.getTsFileResourceSize();
+ }
+ }
+ return totalLinkedButDeletedTsfileResourceRamSize;
+ } catch (final Exception e) {
+ LOGGER.warn("failed to get total size of linked but deleted TsFiles
resource ram size: ", e);
+ return totalLinkedButDeletedTsfileResourceRamSize;
+ }
+ }
}