This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 4bdc55e784c Pipe: Introduce a minimum restart interval to optimize the
restart strategy to avoid frequent restarts & Stay tsfile extraction mode if
the task is currently restarted (#14374) (#14454)
4bdc55e784c is described below
commit 4bdc55e784cffbea9c6af4a8c8eb90b4bd8d595d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Dec 17 00:40:11 2024 +0800
Pipe: Introduce a minimum restart interval to optimize the restart strategy
to avoid frequent restarts & Stay tsfile extraction mode if the task is
currently restarted (#14374) (#14454)
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit 6a28a0792c36265d94c769fa7e0dbcc4a68b77cf)
Co-authored-by: nanxiang xia <[email protected]>
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 37 ++++++++++++++++++++++
.../PipeRealtimeDataRegionHybridExtractor.java | 11 +++++--
.../apache/iotdb/commons/conf/CommonConfig.java | 9 ++++++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +++
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +++
5 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 836557c0231..6dc17896433 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -82,6 +82,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -94,6 +95,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
private static final AtomicLong LAST_FORCED_RESTART_TIME =
new AtomicLong(System.currentTimeMillis());
+ private static final Map<String, AtomicLong>
PIPE_NAME_TO_LAST_RESTART_TIME_MAP =
+ new ConcurrentHashMap<>();
////////////////////////// Pipe Task Management Entry
//////////////////////////
@@ -475,6 +478,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Restart Logic /////////////////////////
public void restartAllStuckPipes() {
+ removeOutdatedPipeInfoFromLastRestartTimeMap();
+
if (!tryWriteLockWithTimeOut(5)) {
return;
}
@@ -486,6 +491,16 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
releaseWriteLock();
}
+ // If the pipe has been restarted recently, skip it.
+ stuckPipes.removeIf(
+ pipeMeta -> {
+ final AtomicLong lastRestartTime =
+
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
+ return lastRestartTime != null
+ && System.currentTimeMillis() - lastRestartTime.get()
+ <
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs();
+ });
+
// Restart all stuck pipes.
// Note that parallelStream cannot be used here. The method
PipeTaskAgent#dropPipe also uses
// parallelStream. If parallelStream is used here, the subtasks generated
inside the dropPipe
@@ -495,6 +510,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
stuckPipes.forEach(this::restartStuckPipe);
}
+ private void removeOutdatedPipeInfoFromLastRestartTimeMap() {
+ PIPE_NAME_TO_LAST_RESTART_TIME_MAP
+ .entrySet()
+ .removeIf(
+ entry -> {
+ final AtomicLong lastRestartTime = entry.getValue();
+ return lastRestartTime == null
+ ||
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()
+ <= System.currentTimeMillis() - lastRestartTime.get();
+ });
+ }
+
private Set<PipeMeta> findAllStuckPipes() {
final Set<PipeMeta> stuckPipes = new HashSet<>();
@@ -618,7 +645,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final long startTime = System.currentTimeMillis();
final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent();
handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
+
+ final long restartTime = System.currentTimeMillis();
+ PIPE_NAME_TO_LAST_RESTART_TIME_MAP
+ .computeIfAbsent(pipeMeta.getStaticMeta().getPipeName(), k -> new
AtomicLong(restartTime))
+ .set(restartTime);
handleSinglePipeMetaChanges(originalPipeMeta);
+
LOGGER.warn(
"Pipe {} was restarted because of stuck, time cost: {} ms.",
originalPipeMeta.getStaticMeta(),
@@ -630,6 +663,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
+ public boolean isPipeTaskCurrentlyRestarted(final String pipeName) {
+ return PIPE_NAME_TO_LAST_RESTART_TIME_MAP.containsKey(pipeName);
+ }
+
///////////////////////// Terminate Logic /////////////////////////
public void markCompleted(final String pipeName, final int regionId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 207a3f8766e..57eca384974 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -202,15 +202,18 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean canNotUseTabletAnyMore() {
- // In the following 5 cases, we should not extract any more tablet events.
all the data
+ // In the following 7 cases, we should not extract any more tablet events.
all the data
// represented by the tablet events should be carried by the following
tsfile event:
+ // 0. If the pipe task is currently restarted.
// 1. If Wal size > maximum size of wal buffer,
// the write operation will be throttled, so we should not extract any
more tablet events.
// 2. The number of pinned memtables has reached the dangerous threshold.
// 3. The number of historical tsFile events to transfer has exceeded the
limit.
// 4. The number of realtime tsfile events to transfer has exceeded the
limit.
// 5. The number of linked tsfiles has reached the dangerous threshold.
- return mayWalSizeReachThrottleThreshold()
+ // 6. The shallow memory usage of the insert node has reached the
dangerous threshold.
+ return isPipeTaskCurrentlyRestarted()
+ || mayWalSizeReachThrottleThreshold()
|| mayMemTablePinnedCountReachDangerousThreshold()
|| isHistoricalTsFileEventCountExceededLimit()
|| isRealtimeTsFileEventCountExceededLimit()
@@ -218,6 +221,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
|| mayInsertNodeMemoryReachDangerousThreshold();
}
+ private boolean isPipeTaskCurrentlyRestarted() {
+ return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
+ }
+
private boolean mayWalSizeReachThrottleThreshold() {
return 3 * WALManager.getInstance().getTotalDiskUsage()
> IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 713fa7ca68d..2ce7124917e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -254,6 +254,7 @@ public class CommonConfig {
private long pipeMaxAllowedLinkedTsFileCount = 100;
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
+ private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes
private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -1029,10 +1030,18 @@ public class CommonConfig {
return pipeStuckRestartIntervalSeconds;
}
+ public long getPipeStuckRestartMinIntervalMs() {
+ return pipeStuckRestartMinIntervalMs;
+ }
+
public void setPipeStuckRestartIntervalSeconds(long
pipeStuckRestartIntervalSeconds) {
this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
}
+ public void setPipeStuckRestartMinIntervalMs(long
pipeStuckRestartMinIntervalMs) {
+ this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
+ }
+
public int getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e6ee4d5c464..361f3031d4f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -505,6 +505,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_stuck_restart_interval_seconds",
String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
+ config.setPipeStuckRestartMinIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_stuck_restart_min_interval_ms",
+ String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
config.setPipeMetaReportMaxLogNumPerRound(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 59ac6db855c..4032bcc0af2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -250,6 +250,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
}
+ public long getPipeStuckRestartMinIntervalMs() {
+ return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
+ }
+
/////////////////////////////// Logger ///////////////////////////////
public int getPipeMetaReportMaxLogNumPerRound() {
@@ -442,6 +446,7 @@ public class PipeConfig {
"PipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage: {}",
getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
LOGGER.info("PipeStuckRestartIntervalSeconds: {}",
getPipeStuckRestartIntervalSeconds());
+ LOGGER.info("PipeStuckRestartMinIntervalMs: {}",
getPipeStuckRestartMinIntervalMs());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());