This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-tsfile-resource-control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9bb06cdb3def27c553369933493cd9ed7704014d Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Dec 2 17:38:26 2024 +0800 Pipe: Introduce restart strategy to control resources' memory only used by pipe hardlinked files --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 18 +++++++++++++++--- .../PipeRealtimeDataRegionHybridExtractor.java | 4 +--- .../db/pipe/resource/memory/PipeMemoryManager.java | 4 ++++ .../db/pipe/resource/tsfile/PipeTsFileResource.java | 4 ++++ .../resource/tsfile/PipeTsFileResourceManager.java | 19 +++++++++++++++++++ 5 files changed, 43 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index a25029f6a58..518b4b20424 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -497,6 +497,19 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { stuckPipes.add(pipeMeta); } + LOGGER.warn( + "All {} pipe(s) will be restarted because of forced restart policy.", stuckPipes.size()); + return stuckPipes; + } + + if (3 * PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize() + >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) { + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + stuckPipes.add(pipeMeta); + } + LOGGER.warn( + "All {} pipe(s) will be restarted because linked tsfiles' resource size exceeds memory limit.", + stuckPipes.size()); return stuckPipes; } @@ -527,7 +540,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { continue; } - // Only restart the stream mode pipes for releasing memTables. + // Try to restart the stream mode pipes for releasing memTables. if (extractors.get(0).isStreamMode()) { if (extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles) && (mayMemTablePinnedCountReachDangerousThreshold() @@ -538,8 +551,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { pipeMeta.getStaticMeta()); stuckPipes.add(pipeMeta); } else if (getFloatingMemoryUsageInByte(pipeName) - >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() - - PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()) + >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() / pipeMetaKeeper.getPipeMetaCount()) { // Extractors of this pipe may have too many insert nodes LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 95437f162b0..f3e88b03dd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -250,9 +250,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return 3 * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName) * PipeDataNodeAgent.task().getPipeCount() - >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() - - PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()) - * 2; + >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index 8ae6235099c..2bda0e32a10 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -499,6 +499,10 @@ public class PipeMemoryManager { return usedMemorySizeInBytes; } + public long getFreeMemorySizeInBytes() { + return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes; + } + public long getTotalMemorySizeInBytes() { return TOTAL_MEMORY_SIZE_IN_BYTES; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java index 7bb67e781f2..96f453f9af9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java @@ -97,6 +97,10 @@ public class PipeTsFileResource implements AutoCloseable { return fileSize; } + public long getTsFileResourceSize() { + return tsFileResource.calculateRamSize(); + } + ///////////////////// Reference Count ///////////////////// public int getReferenceCount() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index e9f58753a04..9607c41a94b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -355,4 +355,23 @@ public class PipeTsFileResourceManager { return 0; } } + + public long getTotalLinkedButDeletedTsfileResourceRamSize() { + long totalLinkedButDeletedTsfileResourceRamSize = 0; + try { + for (final Map.Entry<String, PipeTsFileResource> resourceEntry : + hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet()) { + final PipeTsFileResource pipeTsFileResource = resourceEntry.getValue(); + // If the original TsFile is not deleted, the memory of the resource is not counted + // because the memory of the resource is controlled by TsFileResourceManager. + if (pipeTsFileResource.isOriginalTsFileDeleted()) { + totalLinkedButDeletedTsfileResourceRamSize += pipeTsFileResource.getTsFileResourceSize(); + } + } + return totalLinkedButDeletedTsfileResourceRamSize; + } catch (final Exception e) { + LOGGER.warn("failed to get total size of linked but deleted TsFiles resource ram size: ", e); + return totalLinkedButDeletedTsfileResourceRamSize; + } + } }
