This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1174ce74a3d Pipe: Enable flushing storage engine after last terminate
event reported (#15465) (#15481)
1174ce74a3d is described below
commit 1174ce74a3ddcf8afdd5fa4bcbd1cfe20de7dd58
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 9 17:03:43 2025 +0800
Pipe: Enable flushing storage engine after last terminate event reported
(#15465) (#15481)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 6 +++
.../event/common/terminate/PipeTerminateEvent.java | 45 ++++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 27 +++++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 10 +++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 +++++
5 files changed, 98 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index d4f11db4ad7..5a1f9d8b845 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
import
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -89,6 +90,11 @@ public class PipeDataNodeRuntimeAgent implements IService {
"PipeTaskAgent#restartAllStuckPipes",
PipeDataNodeAgent.task()::restartAllStuckPipes,
PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
+ registerPeriodicalJob(
+ "PipeTaskAgent#flushDataRegionIfNeeded",
+ PipeTerminateEvent::flushDataRegionIfNeeded,
+ PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds());
+
pipePeriodicalJobExecutor.start();
if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 1e3c9e4faf9..ca78dd7d50a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -19,16 +19,23 @@
package org.apache.iotdb.db.pipe.event.common.terminate;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
/**
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls
the termination of pipe,
@@ -37,6 +44,41 @@ import java.util.concurrent.CompletableFuture;
* be discarded.
*/
public class PipeTerminateEvent extends EnrichedEvent {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTerminateEvent.class);
+
+ private static final AtomicLong PROGRESS_REPORT_COUNT = new AtomicLong(0);
+ private static final AtomicLong LAST_PROGRESS_REPORT_TIME = new
AtomicLong(0);
+
+ public static void flushDataRegionIfNeeded() {
+ if (PROGRESS_REPORT_COUNT.get() > 0
+ && PROGRESS_REPORT_COUNT.get()
+ > PipeConfig.getInstance().getPipeFlushAfterTerminateCount()) {
+ flushDataRegion();
+ return;
+ }
+
+ if (LAST_PROGRESS_REPORT_TIME.get() > 0
+ && System.currentTimeMillis() - LAST_PROGRESS_REPORT_TIME.get()
+ > PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds()
* 1000L) {
+ flushDataRegion();
+ }
+ }
+
+ private static void flushDataRegion() {
+ try {
+ StorageEngine.getInstance().operateFlush(new TFlushReq());
+ PROGRESS_REPORT_COUNT.set(0);
+ LAST_PROGRESS_REPORT_TIME.set(0);
+ LOGGER.info("Force flush all data regions because of last progress
report time.");
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to flush all data regions, please check the error message:
{}",
+ e.getMessage(),
+ e);
+ }
+ }
+
private final int dataRegionId;
public PipeTerminateEvent(
@@ -93,6 +135,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
@Override
public void reportProgress() {
+ PROGRESS_REPORT_COUNT.incrementAndGet();
+ LAST_PROGRESS_REPORT_TIME.set(System.currentTimeMillis());
+
// To avoid deadlock
CompletableFuture.runAsync(
() -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 894a28487fa..a813cf216e1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -273,6 +273,8 @@ public class CommonConfig {
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
+ private long pipeFlushAfterLastTerminateSeconds = 30;
+ private long pipeFlushAfterTerminateCount = 30;
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -1478,6 +1480,31 @@ public class CommonConfig {
"pipeStorageEngineFlushTimeIntervalMs is set to {}",
pipeStorageEngineFlushTimeIntervalMs);
}
+ public long getPipeFlushAfterLastTerminateSeconds() {
+ return pipeFlushAfterLastTerminateSeconds;
+ }
+
+ public void setPipeFlushAfterLastTerminateSeconds(long
pipeFlushAfterLastTerminateSeconds) {
+ if (this.pipeFlushAfterLastTerminateSeconds ==
pipeFlushAfterLastTerminateSeconds) {
+ return;
+ }
+ this.pipeFlushAfterLastTerminateSeconds =
pipeFlushAfterLastTerminateSeconds;
+ logger.info(
+ "pipeFlushAfterLastTerminateSeconds is set to {}",
pipeFlushAfterLastTerminateSeconds);
+ }
+
+ public long getPipeFlushAfterTerminateCount() {
+ return pipeFlushAfterTerminateCount;
+ }
+
+ public void setPipeFlushAfterTerminateCount(long
pipeFlushAfterTerminateCount) {
+ if (this.pipeFlushAfterTerminateCount == pipeFlushAfterTerminateCount) {
+ return;
+ }
+ this.pipeFlushAfterTerminateCount = pipeFlushAfterTerminateCount;
+ logger.info("pipeFlushAfterTerminateCount is set to {}",
pipeFlushAfterTerminateCount);
+ }
+
public int getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4ab0ba3213e..addf95cdb00 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -303,6 +303,14 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled();
}
+ public long getPipeFlushAfterTerminateCount() {
+ return COMMON_CONFIG.getPipeFlushAfterTerminateCount();
+ }
+
+ public long getPipeFlushAfterLastTerminateSeconds() {
+ return COMMON_CONFIG.getPipeFlushAfterLastTerminateSeconds();
+ }
+
public long getPipeStorageEngineFlushTimeIntervalMs() {
return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
}
@@ -531,6 +539,8 @@ public class PipeConfig {
LOGGER.info(
"PipeEpochKeepTsFileAfterStuckRestartEnabled: {}",
isPipeEpochKeepTsFileAfterStuckRestartEnabled());
+ LOGGER.info("PipeFlushAfterTerminateCount: {}",
getPipeFlushAfterTerminateCount());
+ LOGGER.info("PipeFlushAfterLastTerminateSeconds: {}",
getPipeFlushAfterLastTerminateSeconds());
LOGGER.info(
"PipeStorageEngineFlushTimeIntervalMs: {}",
getPipeStorageEngineFlushTimeIntervalMs());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a48469e7742..919df926189 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -431,6 +431,16 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_stuck_restart_min_interval_ms",
String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+ config.setPipeFlushAfterLastTerminateSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_flush_after_last_terminate_seconds",
+
String.valueOf(config.getPipeFlushAfterLastTerminateSeconds()))));
+ config.setPipeFlushAfterTerminateCount(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_flush_after_terminate_count",
+ String.valueOf(config.getPipeFlushAfterTerminateCount()))));
config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
Boolean.parseBoolean(
properties.getProperty(