This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch cp14374 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5eaab6f0d54c16fbc105a43a3f9bb13dc3dce917 Author: nanxiang xia <[email protected]> AuthorDate: Mon Dec 16 23:01:12 2024 +0800 Pipe: Introduce a minimum restart interval to optimize the restart strategy to avoid frequent restarts & Stay tsfile extraction mode if the task is currently restarted (#14374) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 6a28a0792c36265d94c769fa7e0dbcc4a68b77cf) --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 37 ++++++++++++++++++++++ .../PipeRealtimeDataRegionHybridExtractor.java | 11 +++++-- .../apache/iotdb/commons/conf/CommonConfig.java | 9 ++++++ .../iotdb/commons/conf/CommonDescriptor.java | 5 +++ .../iotdb/commons/pipe/config/PipeConfig.java | 5 +++ 5 files changed, 65 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 836557c0231..6dc17896433 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -82,6 +82,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -94,6 +95,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { private static final AtomicLong LAST_FORCED_RESTART_TIME = new AtomicLong(System.currentTimeMillis()); + private static final Map<String, AtomicLong> PIPE_NAME_TO_LAST_RESTART_TIME_MAP = + new ConcurrentHashMap<>(); ////////////////////////// Pipe Task Management Entry ////////////////////////// @@ -475,6 +478,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { ///////////////////////// Restart Logic ///////////////////////// public void restartAllStuckPipes() { + removeOutdatedPipeInfoFromLastRestartTimeMap(); + if (!tryWriteLockWithTimeOut(5)) { return; } @@ -486,6 +491,16 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { releaseWriteLock(); } + // If the pipe has been restarted recently, skip it. + stuckPipes.removeIf( + pipeMeta -> { + final AtomicLong lastRestartTime = + PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName()); + return lastRestartTime != null + && System.currentTimeMillis() - lastRestartTime.get() + < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs(); + }); + // Restart all stuck pipes. // Note that parallelStream cannot be used here. The method PipeTaskAgent#dropPipe also uses // parallelStream. If parallelStream is used here, the subtasks generated inside the dropPipe @@ -495,6 +510,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { stuckPipes.forEach(this::restartStuckPipe); } + private void removeOutdatedPipeInfoFromLastRestartTimeMap() { + PIPE_NAME_TO_LAST_RESTART_TIME_MAP + .entrySet() + .removeIf( + entry -> { + final AtomicLong lastRestartTime = entry.getValue(); + return lastRestartTime == null + || PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs() + <= System.currentTimeMillis() - lastRestartTime.get(); + }); + } + private Set<PipeMeta> findAllStuckPipes() { final Set<PipeMeta> stuckPipes = new HashSet<>(); @@ -618,7 +645,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final long startTime = System.currentTimeMillis(); final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent(); handleDropPipe(pipeMeta.getStaticMeta().getPipeName()); + + final long restartTime = System.currentTimeMillis(); + PIPE_NAME_TO_LAST_RESTART_TIME_MAP + .computeIfAbsent(pipeMeta.getStaticMeta().getPipeName(), k -> new AtomicLong(restartTime)) + .set(restartTime); handleSinglePipeMetaChanges(originalPipeMeta); + LOGGER.warn( "Pipe {} was restarted because of stuck, time cost: {} ms.", originalPipeMeta.getStaticMeta(), @@ -630,6 +663,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } } + public boolean isPipeTaskCurrentlyRestarted(final String pipeName) { + return PIPE_NAME_TO_LAST_RESTART_TIME_MAP.containsKey(pipeName); + } + ///////////////////////// Terminate Logic ///////////////////////// public void markCompleted(final String pipeName, final int regionId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 207a3f8766e..57eca384974 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -202,15 +202,18 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } private boolean canNotUseTabletAnyMore() { - // In the following 5 cases, we should not extract any more tablet events. all the data + // In the following 7 cases, we should not extract any more tablet events. all the data // represented by the tablet events should be carried by the following tsfile event: + // 0. If the pipe task is currently restarted. // 1. If Wal size > maximum size of wal buffer, // the write operation will be throttled, so we should not extract any more tablet events. // 2. The number of pinned memtables has reached the dangerous threshold. // 3. The number of historical tsFile events to transfer has exceeded the limit. // 4. The number of realtime tsfile events to transfer has exceeded the limit. // 5. The number of linked tsfiles has reached the dangerous threshold. - return mayWalSizeReachThrottleThreshold() + // 6. The shallow memory usage of the insert node has reached the dangerous threshold. + return isPipeTaskCurrentlyRestarted() + || mayWalSizeReachThrottleThreshold() || mayMemTablePinnedCountReachDangerousThreshold() || isHistoricalTsFileEventCountExceededLimit() || isRealtimeTsFileEventCountExceededLimit() @@ -218,6 +221,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio || mayInsertNodeMemoryReachDangerousThreshold(); } + private boolean isPipeTaskCurrentlyRestarted() { + return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName); + } + private boolean mayWalSizeReachThrottleThreshold() { return 3 * WALManager.getInstance().getTotalDiskUsage() > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 713fa7ca68d..2ce7124917e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -254,6 +254,7 @@ public class CommonConfig { private long pipeMaxAllowedLinkedTsFileCount = 100; private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F; private long pipeStuckRestartIntervalSeconds = 120; + private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes private int pipeMetaReportMaxLogNumPerRound = 10; private int pipeMetaReportMaxLogIntervalRounds = 36; @@ -1029,10 +1030,18 @@ public class CommonConfig { return pipeStuckRestartIntervalSeconds; } + public long getPipeStuckRestartMinIntervalMs() { + return pipeStuckRestartMinIntervalMs; + } + public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) { this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds; } + public void setPipeStuckRestartMinIntervalMs(long pipeStuckRestartMinIntervalMs) { + this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs; + } + public int getPipeMetaReportMaxLogNumPerRound() { return pipeMetaReportMaxLogNumPerRound; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index e6ee4d5c464..361f3031d4f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -505,6 +505,11 @@ public class CommonDescriptor { properties.getProperty( "pipe_stuck_restart_interval_seconds", String.valueOf(config.getPipeStuckRestartIntervalSeconds())))); + config.setPipeStuckRestartMinIntervalMs( + Long.parseLong( + properties.getProperty( + "pipe_stuck_restart_min_interval_ms", + String.valueOf(config.getPipeStuckRestartMinIntervalMs())))); config.setPipeMetaReportMaxLogNumPerRound( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 59ac6db855c..4032bcc0af2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -250,6 +250,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds(); } + public long getPipeStuckRestartMinIntervalMs() { + return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs(); + } + /////////////////////////////// Logger /////////////////////////////// public int getPipeMetaReportMaxLogNumPerRound() { @@ -442,6 +446,7 @@ public class PipeConfig { "PipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage: {}", getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()); LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds()); + LOGGER.info("PipeStuckRestartMinIntervalMs: {}", getPipeStuckRestartMinIntervalMs()); LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());
