This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cc339bd47a Pipe: Introduce timely flush options & Execute flush after 
pipe watchdog restarts & Log degrade reasons for debugging (#14865)
3cc339bd47a is described below

commit 3cc339bd47a7da18c5fb1243479cd728320921ec
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Feb 17 18:56:11 2025 +0800

    Pipe: Introduce timely flush options & Execute flush after pipe watchdog 
restarts & Log degrade reasons for debugging (#14865)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  59 +++++++++---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 102 +++++++++++++++++----
 .../iotdb/db/storageengine/StorageEngine.java      |   8 ++
 .../apache/iotdb/commons/conf/CommonConfig.java    |  15 ++-
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   6 ++
 6 files changed, 163 insertions(+), 32 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 5ae9155cb66..39c20ecade0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -476,7 +476,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   ///////////////////////// Restart Logic /////////////////////////
 
   public void restartAllStuckPipes() {
-    removeOutdatedPipeInfoFromLastRestartTimeMap();
+    final List<String> removedPipeName = 
removeOutdatedPipeInfoFromLastRestartTimeMap();
+    if (!removedPipeName.isEmpty()) {
+      final long currentTime = System.currentTimeMillis();
+      LOGGER.info(
+          "Pipes {} now can dynamically adjust their extraction strategies. "
+              + "Start to flush storage engine to trigger the adjustment.",
+          removedPipeName);
+      StorageEngine.getInstance().syncCloseAllProcessor();
+      WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
+      LOGGER.info(
+          "Finished flushing storage engine, time cost: {} ms.",
+          System.currentTimeMillis() - currentTime);
+      LOGGER.info("Skipping restarting pipes this round because of the dynamic 
flushing.");
+      return;
+    }
 
     if (!tryWriteLockWithTimeOut(5)) {
       return;
@@ -508,16 +522,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     stuckPipes.forEach(this::restartStuckPipe);
   }
 
-  private void removeOutdatedPipeInfoFromLastRestartTimeMap() {
+  private List<String> removeOutdatedPipeInfoFromLastRestartTimeMap() {
+    final List<String> removedPipeName = new ArrayList<>();
     PIPE_NAME_TO_LAST_RESTART_TIME_MAP
         .entrySet()
         .removeIf(
             entry -> {
               final AtomicLong lastRestartTime = entry.getValue();
-              return lastRestartTime == null
-                  || 
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()
-                      <= System.currentTimeMillis() - lastRestartTime.get();
+              final boolean shouldRemove =
+                  lastRestartTime == null
+                      || 
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()
+                          <= System.currentTimeMillis() - 
lastRestartTime.get();
+              if (shouldRemove) {
+                removedPipeName.add(entry.getKey());
+              }
+              return shouldRemove;
             });
+    return removedPipeName;
   }
 
   private Set<PipeMeta> findAllStuckPipes() {
@@ -537,15 +558,20 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return stuckPipes;
     }
 
-    if (3 * 
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize()
-        >= 2 * 
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) {
+    final long totalLinkedButDeletedTsFileResourceRamSize =
+        
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize();
+    final long freeMemorySizeInBytes =
+        PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+    if (3 * totalLinkedButDeletedTsFileResourceRamSize >= 2 * 
freeMemorySizeInBytes) {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         stuckPipes.add(pipeMeta);
       }
       if (!stuckPipes.isEmpty()) {
         LOGGER.warn(
-            "All {} pipe(s) will be restarted because linked tsfiles' resource 
size exceeds memory limit.",
-            stuckPipes.size());
+            "All {} pipe(s) will be restarted because linked tsfiles' resource 
size {} exceeds limit {}.",
+            stuckPipes.size(),
+            totalLinkedButDeletedTsFileResourceRamSize,
+            freeMemorySizeInBytes * 2.0 / 3);
       }
       return stuckPipes;
     }
@@ -584,16 +610,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 || mayWalSizeReachThrottleThreshold())) {
           // Extractors of this pipe may be stuck and is pinning too many 
MemTables.
           LOGGER.warn(
-              "Pipe {} needs to restart because too many memTables are 
pinned.",
-              pipeMeta.getStaticMeta());
+              "Pipe {} needs to restart because too many memtables are pinned. 
mayMemTablePinnedCountReachDangerousThreshold: {}, 
mayWalSizeReachThrottleThreshold: {}",
+              pipeMeta.getStaticMeta(),
+              mayMemTablePinnedCountReachDangerousThreshold(),
+              mayWalSizeReachThrottleThreshold());
           stuckPipes.add(pipeMeta);
         } else if (getFloatingMemoryUsageInByte(pipeName)
             >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
                 / pipeMetaKeeper.getPipeMetaCount()) {
           // Extractors of this pipe may have too many insert nodes
           LOGGER.warn(
-              "Pipe {} needs to restart because too many insertNodes are 
extracted.",
-              pipeMeta.getStaticMeta());
+              "Pipe {} needs to restart because too many insertNodes are 
extracted. "
+                  + "Floating memory usage for this pipe: {}, free memory 
size: {}, allowed free memory size for floating memory usage: {}",
+              pipeMeta.getStaticMeta(),
+              getFloatingMemoryUsageInByte(pipeName),
+              PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+              PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
+                  / pipeMetaKeeper.getPipeMetaCount());
           stuckPipes.add(pipeMeta);
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 0c79b740f3e..4df39838cbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -222,42 +222,112 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean isPipeTaskCurrentlyRestarted() {
-    return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
+    final boolean isPipeTaskCurrentlyRestarted =
+        PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
+    if (isPipeTaskCurrentlyRestarted) {
+      LOGGER.info(
+          "Pipe task {}@{} canNotUseTabletAnyMore1: Pipe task is currently 
restarted",
+          pipeName,
+          dataRegionId);
+    }
+    return isPipeTaskCurrentlyRestarted;
   }
 
   private boolean mayWalSizeReachThrottleThreshold() {
-    return 3 * WALManager.getInstance().getTotalDiskUsage()
-        > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+    final boolean mayWalSizeReachThrottleThreshold =
+        3 * WALManager.getInstance().getTotalDiskUsage()
+            > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+    if (mayWalSizeReachThrottleThreshold) {
+      LOGGER.info(
+          "Pipe task {}@{} canNotUseTabletAnyMore2: Wal size {} has reached 
throttle threshold {}",
+          pipeName,
+          dataRegionId,
+          WALManager.getInstance().getTotalDiskUsage(),
+          IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold() / 
3.0d);
+    }
+    return mayWalSizeReachThrottleThreshold;
   }
 
   private boolean mayMemTablePinnedCountReachDangerousThreshold() {
-    return PipeDataNodeResourceManager.wal().getPinnedWalCount()
-        >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
+    final boolean mayMemTablePinnedCountReachDangerousThreshold =
+        PipeDataNodeResourceManager.wal().getPinnedWalCount()
+            >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
+    if (mayMemTablePinnedCountReachDangerousThreshold) {
+      LOGGER.info(
+          "Pipe task {}@{} canNotUseTabletAnyMore3: The number of pinned 
memtables {} has reached the dangerous threshold {}",
+          pipeName,
+          dataRegionId,
+          PipeDataNodeResourceManager.wal().getPinnedWalCount(),
+          PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount());
+    }
+    return mayMemTablePinnedCountReachDangerousThreshold;
   }
 
   private boolean isHistoricalTsFileEventCountExceededLimit() {
     final IoTDBDataRegionExtractor extractor =
         
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
-    return Objects.nonNull(extractor)
-        && extractor.getHistoricalTsFileInsertionEventCount()
-            >= 
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+    final boolean isHistoricalTsFileEventCountExceededLimit =
+        Objects.nonNull(extractor)
+            && extractor.getHistoricalTsFileInsertionEventCount()
+                >= 
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+    if (isHistoricalTsFileEventCountExceededLimit) {
+      LOGGER.info(
+          "Pipe task {}@{} canNotUseTabletAnyMore4: The number of historical 
tsFile events {} has exceeded the limit {}",
+          pipeName,
+          dataRegionId,
+          extractor.getHistoricalTsFileInsertionEventCount(),
+          
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion());
+    }
+    return isHistoricalTsFileEventCountExceededLimit;
   }
 
   private boolean isRealtimeTsFileEventCountExceededLimit() {
-    return pendingQueue.getTsFileInsertionEventCount()
-        >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+    final boolean isRealtimeTsFileEventCountExceededLimit =
+        pendingQueue.getTsFileInsertionEventCount()
+            >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+    if (isRealtimeTsFileEventCountExceededLimit) {
+      LOGGER.info(
+          "Pipe task {}@{} canNotUseTabletAnyMore5: The number of realtime 
tsFile events {} has exceeded the limit {}",
+          pipeName,
+          dataRegionId,
+          pendingQueue.getTsFileInsertionEventCount(),
+          
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
+    }
+    return isRealtimeTsFileEventCountExceededLimit;
   }
 
   private boolean mayTsFileLinkedCountReachDangerousThreshold() {
-    return PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
-        >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+    final boolean mayTsFileLinkedCountReachDangerousThreshold =
+        PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
+            >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+    if (mayTsFileLinkedCountReachDangerousThreshold) {
+      LOGGER.info(
+          "Pipe task {}@{} canNotUseTabletAnyMore6: The number of linked 
tsfiles {} has reached the dangerous threshold {}",
+          pipeName,
+          dataRegionId,
+          PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount(),
+          PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount());
+    }
+    return mayTsFileLinkedCountReachDangerousThreshold;
   }
 
   private boolean mayInsertNodeMemoryReachDangerousThreshold() {
-    return 3
-            * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
-            * PipeDataNodeAgent.task().getPipeCount()
-        >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+    final long floatingMemoryUsageInByte =
+        PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
+    final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
+    final long freeMemorySizeInBytes =
+        PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+    final boolean mayInsertNodeMemoryReachDangerousThreshold =
+        3 * floatingMemoryUsageInByte * pipeCount >= 2 * freeMemorySizeInBytes;
+    if (mayInsertNodeMemoryReachDangerousThreshold) {
+      LOGGER.info(
+          "Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage 
of the insert node {} has reached the dangerous threshold {}",
+          pipeName,
+          dataRegionId,
+          floatingMemoryUsageInByte * pipeCount,
+          2 * freeMemorySizeInBytes / 3.0d);
+    }
+    return mayInsertNodeMemoryReachDangerousThreshold;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index f958c4c5def..2056a10c564 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.schema.ttl.TTLCache;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -52,6 +53,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
@@ -216,6 +218,12 @@ public class StorageEngine implements IService {
               LOGGER.info(
                   "Storage Engine recover cost: {}s.",
                   (System.currentTimeMillis() - startRecoverTime) / 1000);
+
+              PipeDataNodeAgent.runtime()
+                  .registerPeriodicalJob(
+                      "StorageEngine#operateFlush",
+                      () -> operateFlush(new TFlushReq()),
+                      
PipeConfig.getInstance().getPipeStorageEngineFlushTimeIntervalMs() / 1000);
             },
             ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
     recoverEndTrigger.start();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 825c4169efa..f0212486f52 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -256,12 +256,13 @@ public class CommonConfig {
   private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
 
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
-  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
-  private int pipeMaxAllowedPinnedMemTableCount = 50;
-  private long pipeMaxAllowedLinkedTsFileCount = 100;
+  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 10;
+  private int pipeMaxAllowedPinnedMemTableCount = 1000;
+  private long pipeMaxAllowedLinkedTsFileCount = 300;
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
   private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
+  private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
   private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -1061,6 +1062,10 @@ public class CommonConfig {
     return pipeStuckRestartMinIntervalMs;
   }
 
+  public long getPipeStorageEngineFlushTimeIntervalMs() {
+    return pipeStorageEngineFlushTimeIntervalMs;
+  }
+
   public void setPipeStuckRestartIntervalSeconds(long 
pipeStuckRestartIntervalSeconds) {
     this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
   }
@@ -1069,6 +1074,10 @@ public class CommonConfig {
     this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
   }
 
+  public void setPipeStorageEngineFlushTimeIntervalMs(long 
pipeStorageEngineFlushTimeIntervalMs) {
+    this.pipeStorageEngineFlushTimeIntervalMs = 
pipeStorageEngineFlushTimeIntervalMs;
+  }
+
   public int getPipeMetaReportMaxLogNumPerRound() {
     return pipeMetaReportMaxLogNumPerRound;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3661feefa6e..bea57a45f6c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -524,6 +524,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_stuck_restart_min_interval_ms",
                 String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+    config.setPipeStorageEngineFlushTimeIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_storage_engine_flush_time_interval_ms",
+                
String.valueOf(config.getPipeStorageEngineFlushTimeIntervalMs()))));
 
     config.setPipeMetaReportMaxLogNumPerRound(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index e652bff98f9..bb2b3fd7ac0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -260,6 +260,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
   }
 
+  public long getPipeStorageEngineFlushTimeIntervalMs() {
+    return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
+  }
+
   /////////////////////////////// Logger ///////////////////////////////
 
   public int getPipeMetaReportMaxLogNumPerRound() {
@@ -457,6 +461,8 @@ public class PipeConfig {
         getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
     LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
     LOGGER.info("PipeStuckRestartMinIntervalMs: {}", 
getPipeStuckRestartMinIntervalMs());
+    LOGGER.info(
+        "PipeStorageEngineFlushTimeIntervalMs: {}", 
getPipeStorageEngineFlushTimeIntervalMs());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());

Reply via email to