This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 4bdc55e784c Pipe: Introduce a minimum restart interval to optimize the 
restart strategy to avoid frequent restarts & Stay tsfile extraction mode if 
the task is currently restarted (#14374) (#14454)
4bdc55e784c is described below

commit 4bdc55e784cffbea9c6af4a8c8eb90b4bd8d595d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Dec 17 00:40:11 2024 +0800

    Pipe: Introduce a minimum restart interval to optimize the restart strategy 
to avoid frequent restarts & Stay tsfile extraction mode if the task is 
currently restarted (#14374) (#14454)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit 6a28a0792c36265d94c769fa7e0dbcc4a68b77cf)
    
    Co-authored-by: nanxiang xia <[email protected]>
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 37 ++++++++++++++++++++++
 .../PipeRealtimeDataRegionHybridExtractor.java     | 11 +++++--
 .../apache/iotdb/commons/conf/CommonConfig.java    |  9 ++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 +++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  5 +++
 5 files changed, 65 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 836557c0231..6dc17896433 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -82,6 +82,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -94,6 +95,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   private static final AtomicLong LAST_FORCED_RESTART_TIME =
       new AtomicLong(System.currentTimeMillis());
+  private static final Map<String, AtomicLong> 
PIPE_NAME_TO_LAST_RESTART_TIME_MAP =
+      new ConcurrentHashMap<>();
 
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
@@ -475,6 +478,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   ///////////////////////// Restart Logic /////////////////////////
 
   public void restartAllStuckPipes() {
+    removeOutdatedPipeInfoFromLastRestartTimeMap();
+
     if (!tryWriteLockWithTimeOut(5)) {
       return;
     }
@@ -486,6 +491,16 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       releaseWriteLock();
     }
 
+    // If the pipe has been restarted recently, skip it.
+    stuckPipes.removeIf(
+        pipeMeta -> {
+          final AtomicLong lastRestartTime =
+              
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
+          return lastRestartTime != null
+              && System.currentTimeMillis() - lastRestartTime.get()
+                  < 
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs();
+        });
+
     // Restart all stuck pipes.
     // Note that parallelStream cannot be used here. The method 
PipeTaskAgent#dropPipe also uses
     // parallelStream. If parallelStream is used here, the subtasks generated 
inside the dropPipe
@@ -495,6 +510,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     stuckPipes.forEach(this::restartStuckPipe);
   }
 
+  private void removeOutdatedPipeInfoFromLastRestartTimeMap() {
+    PIPE_NAME_TO_LAST_RESTART_TIME_MAP
+        .entrySet()
+        .removeIf(
+            entry -> {
+              final AtomicLong lastRestartTime = entry.getValue();
+              return lastRestartTime == null
+                  || 
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()
+                      <= System.currentTimeMillis() - lastRestartTime.get();
+            });
+  }
+
   private Set<PipeMeta> findAllStuckPipes() {
     final Set<PipeMeta> stuckPipes = new HashSet<>();
 
@@ -618,7 +645,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final long startTime = System.currentTimeMillis();
       final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent();
       handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
+
+      final long restartTime = System.currentTimeMillis();
+      PIPE_NAME_TO_LAST_RESTART_TIME_MAP
+          .computeIfAbsent(pipeMeta.getStaticMeta().getPipeName(), k -> new 
AtomicLong(restartTime))
+          .set(restartTime);
       handleSinglePipeMetaChanges(originalPipeMeta);
+
       LOGGER.warn(
           "Pipe {} was restarted because of stuck, time cost: {} ms.",
           originalPipeMeta.getStaticMeta(),
@@ -630,6 +663,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
+  public boolean isPipeTaskCurrentlyRestarted(final String pipeName) {
+    return PIPE_NAME_TO_LAST_RESTART_TIME_MAP.containsKey(pipeName);
+  }
+
   ///////////////////////// Terminate Logic /////////////////////////
 
   public void markCompleted(final String pipeName, final int regionId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 207a3f8766e..57eca384974 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -202,15 +202,18 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean canNotUseTabletAnyMore() {
-    // In the following 5 cases, we should not extract any more tablet events. 
all the data
+    // In the following 7 cases, we should not extract any more tablet events. 
all the data
     // represented by the tablet events should be carried by the following 
tsfile event:
+    //  0. If the pipe task is currently restarted.
     //  1. If Wal size > maximum size of wal buffer,
     //  the write operation will be throttled, so we should not extract any 
more tablet events.
     //  2. The number of pinned memtables has reached the dangerous threshold.
     //  3. The number of historical tsFile events to transfer has exceeded the 
limit.
     //  4. The number of realtime tsfile events to transfer has exceeded the 
limit.
     //  5. The number of linked tsfiles has reached the dangerous threshold.
-    return mayWalSizeReachThrottleThreshold()
+    //  6. The shallow memory usage of the insert node has reached the 
dangerous threshold.
+    return isPipeTaskCurrentlyRestarted()
+        || mayWalSizeReachThrottleThreshold()
         || mayMemTablePinnedCountReachDangerousThreshold()
         || isHistoricalTsFileEventCountExceededLimit()
         || isRealtimeTsFileEventCountExceededLimit()
@@ -218,6 +221,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         || mayInsertNodeMemoryReachDangerousThreshold();
   }
 
+  private boolean isPipeTaskCurrentlyRestarted() {
+    return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
+  }
+
   private boolean mayWalSizeReachThrottleThreshold() {
     return 3 * WALManager.getInstance().getTotalDiskUsage()
         > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 713fa7ca68d..2ce7124917e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -254,6 +254,7 @@ public class CommonConfig {
   private long pipeMaxAllowedLinkedTsFileCount = 100;
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
+  private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
   private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -1029,10 +1030,18 @@ public class CommonConfig {
     return pipeStuckRestartIntervalSeconds;
   }
 
+  public long getPipeStuckRestartMinIntervalMs() {
+    return pipeStuckRestartMinIntervalMs;
+  }
+
   public void setPipeStuckRestartIntervalSeconds(long 
pipeStuckRestartIntervalSeconds) {
     this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
   }
 
+  public void setPipeStuckRestartMinIntervalMs(long 
pipeStuckRestartMinIntervalMs) {
+    this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
+  }
+
   public int getPipeMetaReportMaxLogNumPerRound() {
     return pipeMetaReportMaxLogNumPerRound;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e6ee4d5c464..361f3031d4f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -505,6 +505,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_stuck_restart_interval_seconds",
                 String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
+    config.setPipeStuckRestartMinIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_stuck_restart_min_interval_ms",
+                String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
 
     config.setPipeMetaReportMaxLogNumPerRound(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 59ac6db855c..4032bcc0af2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -250,6 +250,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
   }
 
+  public long getPipeStuckRestartMinIntervalMs() {
+    return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
+  }
+
   /////////////////////////////// Logger ///////////////////////////////
 
   public int getPipeMetaReportMaxLogNumPerRound() {
@@ -442,6 +446,7 @@ public class PipeConfig {
         "PipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage: {}",
         getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
     LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
+    LOGGER.info("PipeStuckRestartMinIntervalMs: {}", 
getPipeStuckRestartMinIntervalMs());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());

Reply via email to