This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch ring-buffer-queue
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ring-buffer-queue by this push:
new 3f51569a46a Pipe: Disabled the restart logic by default (#15663)
3f51569a46a is described below
commit 3f51569a46a49868c6d041131e7214eaaeb47ec4
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 6 16:13:44 2025 +0800
Pipe: Disabled the restart logic by default (#15663)
* Added switch
* Refactor
---
.../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 8 ++++++--
.../java/org/apache/iotdb/commons/conf/CommonConfig.java | 13 +++++++++++++
.../org/apache/iotdb/commons/pipe/config/PipeConfig.java | 5 +++++
.../apache/iotdb/commons/pipe/config/PipeDescriptor.java | 4 ++++
4 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8fd099baa06..dee6b76410d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -140,8 +140,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
private void restartPipeToReloadResourceIfNeeded(final PipeMeta pipeMeta) {
- if (System.currentTimeMillis() - pipeMeta.getStaticMeta().getCreationTime()
- < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+ if (!PipeConfig.getInstance().isPipeStuckRestartEnabled()
+ || System.currentTimeMillis() -
pipeMeta.getStaticMeta().getCreationTime()
+ < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
return;
}
@@ -562,6 +563,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Restart Logic /////////////////////////
public void restartAllStuckPipes() {
+ if (!PipeConfig.getInstance().isPipeStuckRestartEnabled()) {
+ return;
+ }
final List<String> removedPipeName =
removeOutdatedPipeInfoFromLastRestartTimeMap();
if (!removedPipeName.isEmpty()) {
final long currentTime = System.currentTimeMillis();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3004ace2e30..4172a7a23d9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -272,6 +272,7 @@ public class CommonConfig {
private int pipeMaxAllowedPinnedMemTableCount = Integer.MAX_VALUE; // per
data region
private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
+ private boolean pipeStuckRestartEnabled = false;
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
@@ -1465,6 +1466,18 @@ public class CommonConfig {
pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage);
}
+ public boolean isPipeStuckRestartEnabled() {
+ return pipeStuckRestartEnabled;
+ }
+
+ public void setPipeStuckRestartEnabled(boolean pipeStuckRestartEnabled) {
+ if (this.pipeStuckRestartEnabled == pipeStuckRestartEnabled) {
+ return;
+ }
+ this.pipeStuckRestartEnabled = pipeStuckRestartEnabled;
+ logger.info("pipeStuckRestartEnabled is set to {}",
pipeStuckRestartEnabled);
+ }
+
public long getPipeStuckRestartIntervalSeconds() {
return pipeStuckRestartIntervalSeconds;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b099e713dcc..6ded1994dce 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -323,6 +323,10 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage();
}
+ public boolean isPipeStuckRestartEnabled() {
+ return COMMON_CONFIG.isPipeStuckRestartEnabled();
+ }
+
public long getPipeStuckRestartIntervalSeconds() {
return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
}
@@ -592,6 +596,7 @@ public class PipeConfig {
LOGGER.info(
"PipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage: {}",
getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
+ LOGGER.info("PipeStuckRestartEnabled: {}", isPipeStuckRestartEnabled());
LOGGER.info("PipeStuckRestartIntervalSeconds: {}",
getPipeStuckRestartIntervalSeconds());
LOGGER.info("PipeStuckRestartMinIntervalMs: {}",
getPipeStuckRestartMinIntervalMs());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index cc9850e90d3..c8920aa3e6d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -438,6 +438,10 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_max_allowed_linked_deleted_tsfile_disk_usage_percentage",
String.valueOf(config.getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()))));
+ config.setPipeStuckRestartEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_stuck_restart_enabled",
String.valueOf(config.isPipeStuckRestartEnabled()))));
config.setPipeStuckRestartIntervalSeconds(
Long.parseLong(
properties.getProperty(