This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e3366ed7c4c Pipe: Enable flushing storage engine after last terminate
event reported (#15465)
e3366ed7c4c is described below
commit e3366ed7c4cddab12808fae16897fc94be5e6be1
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 23:13:39 2025 +0800
Pipe: Enable flushing storage engine after last terminate event reported
(#15465)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 6 +++
.../event/common/terminate/PipeTerminateEvent.java | 45 ++++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 27 +++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 10 +++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 +++++
5 files changed, 98 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 057000dd965..ba6ab4c66cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
import
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -89,6 +90,11 @@ public class PipeDataNodeRuntimeAgent implements IService {
"PipeTaskAgent#restartAllStuckPipes",
PipeDataNodeAgent.task()::restartAllStuckPipes,
PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
+ registerPeriodicalJob(
+ "PipeTaskAgent#flushDataRegionIfNeeded",
+ PipeTerminateEvent::flushDataRegionIfNeeded,
+ PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds());
+
pipePeriodicalJobExecutor.start();
if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index c67102030bb..d431b12a79c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -19,17 +19,24 @@
package org.apache.iotdb.db.pipe.event.common.terminate;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
/**
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls
the termination of pipe,
@@ -38,6 +45,41 @@ import java.util.concurrent.CompletableFuture;
* be discarded.
*/
public class PipeTerminateEvent extends EnrichedEvent {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTerminateEvent.class);
+
+ private static final AtomicLong PROGRESS_REPORT_COUNT = new AtomicLong(0);
+ private static final AtomicLong LAST_PROGRESS_REPORT_TIME = new
AtomicLong(0);
+
+ public static void flushDataRegionIfNeeded() {
+ if (PROGRESS_REPORT_COUNT.get() > 0
+ && PROGRESS_REPORT_COUNT.get()
+ > PipeConfig.getInstance().getPipeFlushAfterTerminateCount()) {
+ flushDataRegion();
+ return;
+ }
+
+ if (LAST_PROGRESS_REPORT_TIME.get() > 0
+ && System.currentTimeMillis() - LAST_PROGRESS_REPORT_TIME.get()
+ > PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds()
* 1000L) {
+ flushDataRegion();
+ }
+ }
+
+ private static void flushDataRegion() {
+ try {
+ StorageEngine.getInstance().operateFlush(new TFlushReq());
+ PROGRESS_REPORT_COUNT.set(0);
+ LAST_PROGRESS_REPORT_TIME.set(0);
+ LOGGER.info("Force flush all data regions because of last progress
report time.");
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to flush all data regions, please check the error message:
{}",
+ e.getMessage(),
+ e);
+ }
+ }
+
private final int dataRegionId;
public PipeTerminateEvent(
@@ -106,6 +148,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
@Override
public void reportProgress() {
+ PROGRESS_REPORT_COUNT.incrementAndGet();
+ LAST_PROGRESS_REPORT_TIME.set(System.currentTimeMillis());
+
// To avoid deadlock
CompletableFuture.runAsync(
() -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 894a28487fa..a813cf216e1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -273,6 +273,8 @@ public class CommonConfig {
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
+ private long pipeFlushAfterLastTerminateSeconds = 30;
+ private long pipeFlushAfterTerminateCount = 30;
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -1478,6 +1480,31 @@ public class CommonConfig {
"pipeStorageEngineFlushTimeIntervalMs is set to {}",
pipeStorageEngineFlushTimeIntervalMs);
}
+ public long getPipeFlushAfterLastTerminateSeconds() {
+ return pipeFlushAfterLastTerminateSeconds;
+ }
+
+ public void setPipeFlushAfterLastTerminateSeconds(long
pipeFlushAfterLastTerminateSeconds) {
+ if (this.pipeFlushAfterLastTerminateSeconds ==
pipeFlushAfterLastTerminateSeconds) {
+ return;
+ }
+ this.pipeFlushAfterLastTerminateSeconds =
pipeFlushAfterLastTerminateSeconds;
+ logger.info(
+ "pipeFlushAfterLastTerminateSeconds is set to {}",
pipeFlushAfterLastTerminateSeconds);
+ }
+
+ public long getPipeFlushAfterTerminateCount() {
+ return pipeFlushAfterTerminateCount;
+ }
+
+ public void setPipeFlushAfterTerminateCount(long
pipeFlushAfterTerminateCount) {
+ if (this.pipeFlushAfterTerminateCount == pipeFlushAfterTerminateCount) {
+ return;
+ }
+ this.pipeFlushAfterTerminateCount = pipeFlushAfterTerminateCount;
+ logger.info("pipeFlushAfterTerminateCount is set to {}",
pipeFlushAfterTerminateCount);
+ }
+
public int getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 0de34c0e08d..9b63b46626c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -303,6 +303,14 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled();
}
+ public long getPipeFlushAfterTerminateCount() {
+ return COMMON_CONFIG.getPipeFlushAfterTerminateCount();
+ }
+
+ public long getPipeFlushAfterLastTerminateSeconds() {
+ return COMMON_CONFIG.getPipeFlushAfterLastTerminateSeconds();
+ }
+
public long getPipeStorageEngineFlushTimeIntervalMs() {
return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
}
@@ -530,6 +538,8 @@ public class PipeConfig {
LOGGER.info(
"PipeEpochKeepTsFileAfterStuckRestartEnabled: {}",
isPipeEpochKeepTsFileAfterStuckRestartEnabled());
+ LOGGER.info("PipeFlushAfterTerminateCount: {}",
getPipeFlushAfterTerminateCount());
+ LOGGER.info("PipeFlushAfterLastTerminateSeconds: {}",
getPipeFlushAfterLastTerminateSeconds());
LOGGER.info(
"PipeStorageEngineFlushTimeIntervalMs: {}",
getPipeStorageEngineFlushTimeIntervalMs());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a48469e7742..919df926189 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -431,6 +431,16 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_stuck_restart_min_interval_ms",
String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+ config.setPipeFlushAfterLastTerminateSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_flush_after_last_terminate_seconds",
+
String.valueOf(config.getPipeFlushAfterLastTerminateSeconds()))));
+ config.setPipeFlushAfterTerminateCount(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_flush_after_terminate_count",
+ String.valueOf(config.getPipeFlushAfterTerminateCount()))));
config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
Boolean.parseBoolean(
properties.getProperty(