This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3cc339bd47a Pipe: Introduce timely flush options & Execute flush after
pipe watchdog restarts & Log degrade reasons for debugging (#14865)
3cc339bd47a is described below
commit 3cc339bd47a7da18c5fb1243479cd728320921ec
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Feb 17 18:56:11 2025 +0800
Pipe: Introduce timely flush options & Execute flush after pipe watchdog
restarts & Log degrade reasons for debugging (#14865)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 59 +++++++++---
.../PipeRealtimeDataRegionHybridExtractor.java | 102 +++++++++++++++++----
.../iotdb/db/storageengine/StorageEngine.java | 8 ++
.../apache/iotdb/commons/conf/CommonConfig.java | 15 ++-
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../iotdb/commons/pipe/config/PipeConfig.java | 6 ++
6 files changed, 163 insertions(+), 32 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 5ae9155cb66..39c20ecade0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -476,7 +476,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Restart Logic /////////////////////////
public void restartAllStuckPipes() {
- removeOutdatedPipeInfoFromLastRestartTimeMap();
+ final List<String> removedPipeName =
removeOutdatedPipeInfoFromLastRestartTimeMap();
+ if (!removedPipeName.isEmpty()) {
+ final long currentTime = System.currentTimeMillis();
+ LOGGER.info(
+ "Pipes {} now can dynamically adjust their extraction strategies. "
+ + "Start to flush storage engine to trigger the adjustment.",
+ removedPipeName);
+ StorageEngine.getInstance().syncCloseAllProcessor();
+ WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
+ LOGGER.info(
+ "Finished flushing storage engine, time cost: {} ms.",
+ System.currentTimeMillis() - currentTime);
+ LOGGER.info("Skipping restarting pipes this round because of the dynamic
flushing.");
+ return;
+ }
if (!tryWriteLockWithTimeOut(5)) {
return;
@@ -508,16 +522,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
stuckPipes.forEach(this::restartStuckPipe);
}
- private void removeOutdatedPipeInfoFromLastRestartTimeMap() {
+ private List<String> removeOutdatedPipeInfoFromLastRestartTimeMap() {
+ final List<String> removedPipeName = new ArrayList<>();
PIPE_NAME_TO_LAST_RESTART_TIME_MAP
.entrySet()
.removeIf(
entry -> {
final AtomicLong lastRestartTime = entry.getValue();
- return lastRestartTime == null
- ||
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()
- <= System.currentTimeMillis() - lastRestartTime.get();
+ final boolean shouldRemove =
+ lastRestartTime == null
+ ||
PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()
+ <= System.currentTimeMillis() -
lastRestartTime.get();
+ if (shouldRemove) {
+ removedPipeName.add(entry.getKey());
+ }
+ return shouldRemove;
});
+ return removedPipeName;
}
private Set<PipeMeta> findAllStuckPipes() {
@@ -537,15 +558,20 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return stuckPipes;
}
- if (3 *
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize()
- >= 2 *
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) {
+ final long totalLinkedButDeletedTsFileResourceRamSize =
+
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize();
+ final long freeMemorySizeInBytes =
+ PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+ if (3 * totalLinkedButDeletedTsFileResourceRamSize >= 2 *
freeMemorySizeInBytes) {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
stuckPipes.add(pipeMeta);
}
if (!stuckPipes.isEmpty()) {
LOGGER.warn(
- "All {} pipe(s) will be restarted because linked tsfiles' resource
size exceeds memory limit.",
- stuckPipes.size());
+ "All {} pipe(s) will be restarted because linked tsfiles' resource
size {} exceeds limit {}.",
+ stuckPipes.size(),
+ totalLinkedButDeletedTsFileResourceRamSize,
+ freeMemorySizeInBytes * 2.0 / 3);
}
return stuckPipes;
}
@@ -584,16 +610,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
|| mayWalSizeReachThrottleThreshold())) {
// Extractors of this pipe may be stuck and is pinning too many
MemTables.
LOGGER.warn(
- "Pipe {} needs to restart because too many memTables are
pinned.",
- pipeMeta.getStaticMeta());
+ "Pipe {} needs to restart because too many memtables are pinned.
mayMemTablePinnedCountReachDangerousThreshold: {},
mayWalSizeReachThrottleThreshold: {}",
+ pipeMeta.getStaticMeta(),
+ mayMemTablePinnedCountReachDangerousThreshold(),
+ mayWalSizeReachThrottleThreshold());
stuckPipes.add(pipeMeta);
} else if (getFloatingMemoryUsageInByte(pipeName)
>= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
/ pipeMetaKeeper.getPipeMetaCount()) {
// Extractors of this pipe may have too many insert nodes
LOGGER.warn(
- "Pipe {} needs to restart because too many insertNodes are
extracted.",
- pipeMeta.getStaticMeta());
+ "Pipe {} needs to restart because too many insertNodes are
extracted. "
+ + "Floating memory usage for this pipe: {}, free memory
size: {}, allowed free memory size for floating memory usage: {}",
+ pipeMeta.getStaticMeta(),
+ getFloatingMemoryUsageInByte(pipeName),
+ PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+ PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
+ / pipeMetaKeeper.getPipeMetaCount());
stuckPipes.add(pipeMeta);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 0c79b740f3e..4df39838cbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -222,42 +222,112 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean isPipeTaskCurrentlyRestarted() {
- return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
+ final boolean isPipeTaskCurrentlyRestarted =
+ PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
+ if (isPipeTaskCurrentlyRestarted) {
+ LOGGER.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore1: Pipe task is currently
restarted",
+ pipeName,
+ dataRegionId);
+ }
+ return isPipeTaskCurrentlyRestarted;
}
private boolean mayWalSizeReachThrottleThreshold() {
- return 3 * WALManager.getInstance().getTotalDiskUsage()
- > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+ final boolean mayWalSizeReachThrottleThreshold =
+ 3 * WALManager.getInstance().getTotalDiskUsage()
+ > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+ if (mayWalSizeReachThrottleThreshold) {
+ LOGGER.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore2: Wal size {} has reached
throttle threshold {}",
+ pipeName,
+ dataRegionId,
+ WALManager.getInstance().getTotalDiskUsage(),
+ IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold() /
3.0d);
+ }
+ return mayWalSizeReachThrottleThreshold;
}
private boolean mayMemTablePinnedCountReachDangerousThreshold() {
- return PipeDataNodeResourceManager.wal().getPinnedWalCount()
- >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
+ final boolean mayMemTablePinnedCountReachDangerousThreshold =
+ PipeDataNodeResourceManager.wal().getPinnedWalCount()
+ >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
+ if (mayMemTablePinnedCountReachDangerousThreshold) {
+ LOGGER.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore3: The number of pinned
memtables {} has reached the dangerous threshold {}",
+ pipeName,
+ dataRegionId,
+ PipeDataNodeResourceManager.wal().getPinnedWalCount(),
+ PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount());
+ }
+ return mayMemTablePinnedCountReachDangerousThreshold;
}
private boolean isHistoricalTsFileEventCountExceededLimit() {
final IoTDBDataRegionExtractor extractor =
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
- return Objects.nonNull(extractor)
- && extractor.getHistoricalTsFileInsertionEventCount()
- >=
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+ final boolean isHistoricalTsFileEventCountExceededLimit =
+ Objects.nonNull(extractor)
+ && extractor.getHistoricalTsFileInsertionEventCount()
+ >=
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+ if (isHistoricalTsFileEventCountExceededLimit) {
+ LOGGER.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore4: The number of historical
tsFile events {} has exceeded the limit {}",
+ pipeName,
+ dataRegionId,
+ extractor.getHistoricalTsFileInsertionEventCount(),
+
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion());
+ }
+ return isHistoricalTsFileEventCountExceededLimit;
}
private boolean isRealtimeTsFileEventCountExceededLimit() {
- return pendingQueue.getTsFileInsertionEventCount()
- >=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+ final boolean isRealtimeTsFileEventCountExceededLimit =
+ pendingQueue.getTsFileInsertionEventCount()
+ >=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+ if (isRealtimeTsFileEventCountExceededLimit) {
+ LOGGER.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore5: The number of realtime
tsFile events {} has exceeded the limit {}",
+ pipeName,
+ dataRegionId,
+ pendingQueue.getTsFileInsertionEventCount(),
+
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
+ }
+ return isRealtimeTsFileEventCountExceededLimit;
}
private boolean mayTsFileLinkedCountReachDangerousThreshold() {
- return PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
- >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+ final boolean mayTsFileLinkedCountReachDangerousThreshold =
+ PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
+ >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+ if (mayTsFileLinkedCountReachDangerousThreshold) {
+ LOGGER.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore6: The number of linked
tsfiles {} has reached the dangerous threshold {}",
+ pipeName,
+ dataRegionId,
+ PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount(),
+ PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount());
+ }
+ return mayTsFileLinkedCountReachDangerousThreshold;
}
private boolean mayInsertNodeMemoryReachDangerousThreshold() {
- return 3
- * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
- * PipeDataNodeAgent.task().getPipeCount()
- >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+ final long floatingMemoryUsageInByte =
+ PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
+ final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
+ final long freeMemorySizeInBytes =
+ PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
+ final boolean mayInsertNodeMemoryReachDangerousThreshold =
+ 3 * floatingMemoryUsageInByte * pipeCount >= 2 * freeMemorySizeInBytes;
+ if (mayInsertNodeMemoryReachDangerousThreshold) {
+ LOGGER.info(
+ "Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage
of the insert node {} has reached the dangerous threshold {}",
+ pipeName,
+ dataRegionId,
+ floatingMemoryUsageInByte * pipeCount,
+ 2 * freeMemorySizeInBytes / 3.0d);
+ }
+ return mayInsertNodeMemoryReachDangerousThreshold;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index f958c4c5def..2056a10c564 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -52,6 +53,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
@@ -216,6 +218,12 @@ public class StorageEngine implements IService {
LOGGER.info(
"Storage Engine recover cost: {}s.",
(System.currentTimeMillis() - startRecoverTime) / 1000);
+
+ PipeDataNodeAgent.runtime()
+ .registerPeriodicalJob(
+ "StorageEngine#operateFlush",
+ () -> operateFlush(new TFlushReq()),
+
PipeConfig.getInstance().getPipeStorageEngineFlushTimeIntervalMs() / 1000);
},
ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
recoverEndTrigger.start();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 825c4169efa..f0212486f52 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -256,12 +256,13 @@ public class CommonConfig {
private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
- private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
- private int pipeMaxAllowedPinnedMemTableCount = 50;
- private long pipeMaxAllowedLinkedTsFileCount = 100;
+ private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 10;
+ private int pipeMaxAllowedPinnedMemTableCount = 1000;
+ private long pipeMaxAllowedLinkedTsFileCount = 300;
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
+ private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -1061,6 +1062,10 @@ public class CommonConfig {
return pipeStuckRestartMinIntervalMs;
}
+ public long getPipeStorageEngineFlushTimeIntervalMs() {
+ return pipeStorageEngineFlushTimeIntervalMs;
+ }
+
public void setPipeStuckRestartIntervalSeconds(long
pipeStuckRestartIntervalSeconds) {
this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
}
@@ -1069,6 +1074,10 @@ public class CommonConfig {
this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
}
+ public void setPipeStorageEngineFlushTimeIntervalMs(long
pipeStorageEngineFlushTimeIntervalMs) {
+ this.pipeStorageEngineFlushTimeIntervalMs =
pipeStorageEngineFlushTimeIntervalMs;
+ }
+
public int getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3661feefa6e..bea57a45f6c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -524,6 +524,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_stuck_restart_min_interval_ms",
String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+ config.setPipeStorageEngineFlushTimeIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_storage_engine_flush_time_interval_ms",
+
String.valueOf(config.getPipeStorageEngineFlushTimeIntervalMs()))));
config.setPipeMetaReportMaxLogNumPerRound(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index e652bff98f9..bb2b3fd7ac0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -260,6 +260,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
}
+ public long getPipeStorageEngineFlushTimeIntervalMs() {
+ return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
+ }
+
/////////////////////////////// Logger ///////////////////////////////
public int getPipeMetaReportMaxLogNumPerRound() {
@@ -457,6 +461,8 @@ public class PipeConfig {
getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
LOGGER.info("PipeStuckRestartIntervalSeconds: {}",
getPipeStuckRestartIntervalSeconds());
LOGGER.info("PipeStuckRestartMinIntervalMs: {}",
getPipeStuckRestartMinIntervalMs());
+ LOGGER.info(
+ "PipeStorageEngineFlushTimeIntervalMs: {}",
getPipeStorageEngineFlushTimeIntervalMs());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());