This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8150f7bde1d Pipe: Adjust the reloading logic to restart all the pipes
when deleted tsfiles are linked too many (#12566)
8150f7bde1d is described below
commit 8150f7bde1dc0e21649258bc11694e8494234282
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 22 12:49:15 2024 +0800
Pipe: Adjust the reloading logic to restart all the pipes when deleted
tsfiles are linked too many (#12566)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 39 ++++++++++++----------
1 file changed, 21 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 2671de28f19..366024d2906 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -355,26 +355,29 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
continue;
}
- if (!extractors.get(0).isStreamMode()
- || extractors.stream()
-
.noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)) {
- // Extractors of this pipe might not pin too much MemTables,
- // still need to check if linked-and-deleted TsFile count exceeds
limit.
- if ((CONFIG.isEnableSeqSpaceCompaction()
- || CONFIG.isEnableUnseqSpaceCompaction()
- || CONFIG.isEnableCrossSpaceCompaction())
- && mayDeletedTsFileSizeReachDangerousThreshold()) {
- LOGGER.warn(
- "Pipe {} needs to restart because too many TsFiles are
out-of-date.",
- pipeMeta.getStaticMeta());
- stuckPipes.add(pipeMeta);
- }
+ // Extractors of this pipe might not pin too much MemTables,
+ // still need to check if linked-and-deleted TsFile count exceeds limit.
+ // Typically, if deleted tsFiles are too abundant all pipes may need to
restart.
+ if ((CONFIG.isEnableSeqSpaceCompaction()
+ || CONFIG.isEnableUnseqSpaceCompaction()
+ || CONFIG.isEnableCrossSpaceCompaction())
+ && mayDeletedTsFileSizeReachDangerousThreshold()) {
+ LOGGER.warn(
+ "Pipe {} needs to restart because too many TsFiles are
out-of-date.",
+ pipeMeta.getStaticMeta());
+ stuckPipes.add(pipeMeta);
continue;
}
- if (mayMemTablePinnedCountReachDangerousThreshold() ||
mayWalSizeReachThrottleThreshold()) {
- // Extractors of this pipe may be stuck and pinning too much MemTables.
- LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
+ // Only restart the stream mode pipes for releasing memTables.
+ if (extractors.get(0).isStreamMode()
+ &&
extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
+ && (mayMemTablePinnedCountReachDangerousThreshold()
+ || mayWalSizeReachThrottleThreshold())) {
+ // Extractors of this pipe may be stuck and is pinning too many
MemTables.
+ LOGGER.warn(
+ "Pipe {} needs to restart because too many memTables are pinned.",
+ pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
}
}
@@ -402,7 +405,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
&& linkedButDeletedTsFileSize
>
PipeConfig.getInstance().getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()
* totalDisk;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn("Failed to judge if deleted TsFile size reaches dangerous
threshold.", e);
return false;
}