This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e3366ed7c4c Pipe: Enable flushing storage engine after last terminate 
event reported (#15465)
e3366ed7c4c is described below

commit e3366ed7c4cddab12808fae16897fc94be5e6be1
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 23:13:39 2025 +0800

    Pipe: Enable flushing storage engine after last terminate event reported 
(#15465)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    |  6 +++
 .../event/common/terminate/PipeTerminateEvent.java | 45 ++++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 27 +++++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 10 +++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 10 +++++
 5 files changed, 98 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 057000dd965..ba6ab4c66cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
 import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -89,6 +90,11 @@ public class PipeDataNodeRuntimeAgent implements IService {
         "PipeTaskAgent#restartAllStuckPipes",
         PipeDataNodeAgent.task()::restartAllStuckPipes,
         PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
+    registerPeriodicalJob(
+        "PipeTaskAgent#flushDataRegionIfNeeded",
+        PipeTerminateEvent::flushDataRegionIfNeeded,
+        PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds());
+
     pipePeriodicalJobExecutor.start();
 
     if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index c67102030bb..d431b12a79c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -19,17 +19,24 @@
 
 package org.apache.iotdb.db.pipe.event.common.terminate;
 
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls 
the termination of pipe,
@@ -38,6 +45,41 @@ import java.util.concurrent.CompletableFuture;
  * be discarded.
  */
 public class PipeTerminateEvent extends EnrichedEvent {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTerminateEvent.class);
+
+  private static final AtomicLong PROGRESS_REPORT_COUNT = new AtomicLong(0);
+  private static final AtomicLong LAST_PROGRESS_REPORT_TIME = new 
AtomicLong(0);
+
+  public static void flushDataRegionIfNeeded() {
+    if (PROGRESS_REPORT_COUNT.get() > 0
+        && PROGRESS_REPORT_COUNT.get()
+            > PipeConfig.getInstance().getPipeFlushAfterTerminateCount()) {
+      flushDataRegion();
+      return;
+    }
+
+    if (LAST_PROGRESS_REPORT_TIME.get() > 0
+        && System.currentTimeMillis() - LAST_PROGRESS_REPORT_TIME.get()
+            > PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds() 
* 1000L) {
+      flushDataRegion();
+    }
+  }
+
+  private static void flushDataRegion() {
+    try {
+      StorageEngine.getInstance().operateFlush(new TFlushReq());
+      PROGRESS_REPORT_COUNT.set(0);
+      LAST_PROGRESS_REPORT_TIME.set(0);
+      LOGGER.info("Force flush all data regions because of last progress 
report time.");
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to flush all data regions, please check the error message: 
{}",
+          e.getMessage(),
+          e);
+    }
+  }
+
   private final int dataRegionId;
 
   public PipeTerminateEvent(
@@ -106,6 +148,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
 
   @Override
   public void reportProgress() {
+    PROGRESS_REPORT_COUNT.incrementAndGet();
+    LAST_PROGRESS_REPORT_TIME.set(System.currentTimeMillis());
+
     // To avoid deadlock
     CompletableFuture.runAsync(
         () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 894a28487fa..a813cf216e1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -273,6 +273,8 @@ public class CommonConfig {
   private long pipeStuckRestartIntervalSeconds = 120;
   private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
   private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
+  private long pipeFlushAfterLastTerminateSeconds = 30;
+  private long pipeFlushAfterTerminateCount = 30;
   private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -1478,6 +1480,31 @@ public class CommonConfig {
         "pipeStorageEngineFlushTimeIntervalMs is set to {}", 
pipeStorageEngineFlushTimeIntervalMs);
   }
 
+  public long getPipeFlushAfterLastTerminateSeconds() {
+    return pipeFlushAfterLastTerminateSeconds;
+  }
+
+  public void setPipeFlushAfterLastTerminateSeconds(long 
pipeFlushAfterLastTerminateSeconds) {
+    if (this.pipeFlushAfterLastTerminateSeconds == 
pipeFlushAfterLastTerminateSeconds) {
+      return;
+    }
+    this.pipeFlushAfterLastTerminateSeconds = 
pipeFlushAfterLastTerminateSeconds;
+    logger.info(
+        "pipeFlushAfterLastTerminateSeconds is set to {}", 
pipeFlushAfterLastTerminateSeconds);
+  }
+
+  public long getPipeFlushAfterTerminateCount() {
+    return pipeFlushAfterTerminateCount;
+  }
+
+  public void setPipeFlushAfterTerminateCount(long 
pipeFlushAfterTerminateCount) {
+    if (this.pipeFlushAfterTerminateCount == pipeFlushAfterTerminateCount) {
+      return;
+    }
+    this.pipeFlushAfterTerminateCount = pipeFlushAfterTerminateCount;
+    logger.info("pipeFlushAfterTerminateCount is set to {}", 
pipeFlushAfterTerminateCount);
+  }
+
   public int getPipeMetaReportMaxLogNumPerRound() {
     return pipeMetaReportMaxLogNumPerRound;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 0de34c0e08d..9b63b46626c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -303,6 +303,14 @@ public class PipeConfig {
     return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled();
   }
 
+  public long getPipeFlushAfterTerminateCount() {
+    return COMMON_CONFIG.getPipeFlushAfterTerminateCount();
+  }
+
+  public long getPipeFlushAfterLastTerminateSeconds() {
+    return COMMON_CONFIG.getPipeFlushAfterLastTerminateSeconds();
+  }
+
   public long getPipeStorageEngineFlushTimeIntervalMs() {
     return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
   }
@@ -530,6 +538,8 @@ public class PipeConfig {
     LOGGER.info(
         "PipeEpochKeepTsFileAfterStuckRestartEnabled: {}",
         isPipeEpochKeepTsFileAfterStuckRestartEnabled());
+    LOGGER.info("PipeFlushAfterTerminateCount: {}", 
getPipeFlushAfterTerminateCount());
+    LOGGER.info("PipeFlushAfterLastTerminateSeconds: {}", 
getPipeFlushAfterLastTerminateSeconds());
     LOGGER.info(
         "PipeStorageEngineFlushTimeIntervalMs: {}", 
getPipeStorageEngineFlushTimeIntervalMs());
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a48469e7742..919df926189 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -431,6 +431,16 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_stuck_restart_min_interval_ms",
                 String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+    config.setPipeFlushAfterLastTerminateSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_flush_after_last_terminate_seconds",
+                
String.valueOf(config.getPipeFlushAfterLastTerminateSeconds()))));
+    config.setPipeFlushAfterTerminateCount(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_flush_after_terminate_count",
+                String.valueOf(config.getPipeFlushAfterTerminateCount()))));
     config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
         Boolean.parseBoolean(
             properties.getProperty(

Reply via email to