This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch flush-hook-after-watchdog-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bd2e12d1b2906b4cc815d50fc7b2688f2e495e98 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Feb 17 18:56:11 2025 +0800 Pipe: Introduce timely flush options & Execute flush after pipe watchdog restarts & Log degrade reasons for debugging (#14865) (cherry picked from commit 3cc339bd47a7da18c5fb1243479cd728320921ec) --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 59 +++++++++--- .../PipeRealtimeDataRegionHybridExtractor.java | 102 +++++++++++++++++---- .../iotdb/db/storageengine/StorageEngine.java | 8 ++ .../apache/iotdb/commons/conf/CommonConfig.java | 15 ++- .../iotdb/commons/conf/CommonDescriptor.java | 5 + .../iotdb/commons/pipe/config/PipeConfig.java | 6 ++ 6 files changed, 163 insertions(+), 32 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 6dc17896433..c95d4fc5a38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -478,7 +478,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { ///////////////////////// Restart Logic ///////////////////////// public void restartAllStuckPipes() { - removeOutdatedPipeInfoFromLastRestartTimeMap(); + final List<String> removedPipeName = removeOutdatedPipeInfoFromLastRestartTimeMap(); + if (!removedPipeName.isEmpty()) { + final long currentTime = System.currentTimeMillis(); + LOGGER.info( + "Pipes {} now can dynamically adjust their extraction strategies. " + + "Start to flush storage engine to trigger the adjustment.", + removedPipeName); + StorageEngine.getInstance().syncCloseAllProcessor(); + WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); + LOGGER.info( + "Finished flushing storage engine, time cost: {} ms.", + System.currentTimeMillis() - currentTime); + LOGGER.info("Skipping restarting pipes this round because of the dynamic flushing."); + return; + } if (!tryWriteLockWithTimeOut(5)) { return; @@ -510,16 +524,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { stuckPipes.forEach(this::restartStuckPipe); } - private void removeOutdatedPipeInfoFromLastRestartTimeMap() { + private List<String> removeOutdatedPipeInfoFromLastRestartTimeMap() { + final List<String> removedPipeName = new ArrayList<>(); PIPE_NAME_TO_LAST_RESTART_TIME_MAP .entrySet() .removeIf( entry -> { final AtomicLong lastRestartTime = entry.getValue(); - return lastRestartTime == null - || PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs() - <= System.currentTimeMillis() - lastRestartTime.get(); + final boolean shouldRemove = + lastRestartTime == null + || PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs() + <= System.currentTimeMillis() - lastRestartTime.get(); + if (shouldRemove) { + removedPipeName.add(entry.getKey()); + } + return shouldRemove; }); + return removedPipeName; } private Set<PipeMeta> findAllStuckPipes() { @@ -539,15 +560,20 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return stuckPipes; } - if (3 * PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize() - >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) { + final long totalLinkedButDeletedTsFileResourceRamSize = + PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize(); + final long freeMemorySizeInBytes = + PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(); + if (3 * totalLinkedButDeletedTsFileResourceRamSize >= 2 * freeMemorySizeInBytes) { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { stuckPipes.add(pipeMeta); } if (!stuckPipes.isEmpty()) { LOGGER.warn( - "All {} pipe(s) will be restarted because linked tsfiles' resource size exceeds memory limit.", - stuckPipes.size()); + "All {} pipe(s) will be restarted because linked tsfiles' resource size {} exceeds limit {}.", + stuckPipes.size(), + totalLinkedButDeletedTsFileResourceRamSize, + freeMemorySizeInBytes * 2.0 / 3); } return stuckPipes; } @@ -586,16 +612,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { || mayWalSizeReachThrottleThreshold())) { // Extractors of this pipe may be stuck and is pinning too many MemTables. LOGGER.warn( - "Pipe {} needs to restart because too many memTables are pinned.", - pipeMeta.getStaticMeta()); + "Pipe {} needs to restart because too many memtables are pinned. mayMemTablePinnedCountReachDangerousThreshold: {}, mayWalSizeReachThrottleThreshold: {}", + pipeMeta.getStaticMeta(), + mayMemTablePinnedCountReachDangerousThreshold(), + mayWalSizeReachThrottleThreshold()); stuckPipes.add(pipeMeta); } else if (getFloatingMemoryUsageInByte(pipeName) >= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() / pipeMetaKeeper.getPipeMetaCount()) { // Extractors of this pipe may have too many insert nodes LOGGER.warn( - "Pipe {} needs to restart because too many insertNodes are extracted.", - pipeMeta.getStaticMeta()); + "Pipe {} needs to restart because too many insertNodes are extracted. " + + "Floating memory usage for this pipe: {}, free memory size: {}, allowed free memory size for floating memory usage: {}", + pipeMeta.getStaticMeta(), + getFloatingMemoryUsageInByte(pipeName), + PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(), + PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() + / pipeMetaKeeper.getPipeMetaCount()); stuckPipes.add(pipeMeta); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 57eca384974..822991bdd5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -222,42 +222,112 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } private boolean isPipeTaskCurrentlyRestarted() { - return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName); + final boolean isPipeTaskCurrentlyRestarted = + PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName); + if (isPipeTaskCurrentlyRestarted) { + LOGGER.info( + "Pipe task {}@{} canNotUseTabletAnyMore1: Pipe task is currently restarted", + pipeName, + dataRegionId); + } + return isPipeTaskCurrentlyRestarted; } private boolean mayWalSizeReachThrottleThreshold() { - return 3 * WALManager.getInstance().getTotalDiskUsage() - > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold(); + final boolean mayWalSizeReachThrottleThreshold = + 3 * WALManager.getInstance().getTotalDiskUsage() + > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold(); + if (mayWalSizeReachThrottleThreshold) { + LOGGER.info( + "Pipe task {}@{} canNotUseTabletAnyMore2: Wal size {} has reached throttle threshold {}", + pipeName, + dataRegionId, + WALManager.getInstance().getTotalDiskUsage(), + IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold() / 3.0d); + } + return mayWalSizeReachThrottleThreshold; } private boolean mayMemTablePinnedCountReachDangerousThreshold() { - return PipeDataNodeResourceManager.wal().getPinnedWalCount() - >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount(); + final boolean mayMemTablePinnedCountReachDangerousThreshold = + PipeDataNodeResourceManager.wal().getPinnedWalCount() + >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount(); + if (mayMemTablePinnedCountReachDangerousThreshold) { + LOGGER.info( + "Pipe task {}@{} canNotUseTabletAnyMore3: The number of pinned memtables {} has reached the dangerous threshold {}", + pipeName, + dataRegionId, + PipeDataNodeResourceManager.wal().getPinnedWalCount(), + PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount()); + } + return mayMemTablePinnedCountReachDangerousThreshold; } private boolean isHistoricalTsFileEventCountExceededLimit() { final IoTDBDataRegionExtractor extractor = PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID()); - return Objects.nonNull(extractor) - && extractor.getHistoricalTsFileInsertionEventCount() - >= PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion(); + final boolean isHistoricalTsFileEventCountExceededLimit = + Objects.nonNull(extractor) + && extractor.getHistoricalTsFileInsertionEventCount() + >= PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion(); + if (isHistoricalTsFileEventCountExceededLimit) { + LOGGER.info( + "Pipe task {}@{} canNotUseTabletAnyMore4: The number of historical tsFile events {} has exceeded the limit {}", + pipeName, + dataRegionId, + extractor.getHistoricalTsFileInsertionEventCount(), + PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion()); + } + return isHistoricalTsFileEventCountExceededLimit; } private boolean isRealtimeTsFileEventCountExceededLimit() { - return pendingQueue.getTsFileInsertionEventCount() - >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); + final boolean isRealtimeTsFileEventCountExceededLimit = + pendingQueue.getTsFileInsertionEventCount() + >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); + if (isRealtimeTsFileEventCountExceededLimit) { + LOGGER.info( + "Pipe task {}@{} canNotUseTabletAnyMore5: The number of realtime tsFile events {} has exceeded the limit {}", + pipeName, + dataRegionId, + pendingQueue.getTsFileInsertionEventCount(), + PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion()); + } + return isRealtimeTsFileEventCountExceededLimit; } private boolean mayTsFileLinkedCountReachDangerousThreshold() { - return PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount() - >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + final boolean mayTsFileLinkedCountReachDangerousThreshold = + PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount() + >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + if (mayTsFileLinkedCountReachDangerousThreshold) { + LOGGER.info( + "Pipe task {}@{} canNotUseTabletAnyMore6: The number of linked tsfiles {} has reached the dangerous threshold {}", + pipeName, + dataRegionId, + PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount(), + PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount()); + } + return mayTsFileLinkedCountReachDangerousThreshold; } private boolean mayInsertNodeMemoryReachDangerousThreshold() { - return 3 - * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName) - * PipeDataNodeAgent.task().getPipeCount() - >= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(); + final long floatingMemoryUsageInByte = + PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName); + final long pipeCount = PipeDataNodeAgent.task().getPipeCount(); + final long freeMemorySizeInBytes = + PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(); + final boolean mayInsertNodeMemoryReachDangerousThreshold = + 3 * floatingMemoryUsageInByte * pipeCount >= 2 * freeMemorySizeInBytes; + if (mayInsertNodeMemoryReachDangerousThreshold) { + LOGGER.info( + "Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage of the insert node {} has reached the dangerous threshold {}", + pipeName, + dataRegionId, + floatingMemoryUsageInByte * pipeCount, + 2 * freeMemorySizeInBytes / 3.0d); + } + return mayInsertNodeMemoryReachDangerousThreshold; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index d88703b5b0c..0c0a9c25eec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -37,6 +37,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.ShutdownException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.schema.ttl.TTLCache; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; @@ -52,6 +53,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.apache.iotdb.db.exception.load.LoadReadOnlyException; import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; @@ -216,6 +218,12 @@ public class StorageEngine implements IService { LOGGER.info( "Storage Engine recover cost: {}s.", (System.currentTimeMillis() - startRecoverTime) / 1000); + + PipeDataNodeAgent.runtime() + .registerPeriodicalJob( + "StorageEngine#operateFlush", + () -> operateFlush(new TFlushReq()), + PipeConfig.getInstance().getPipeStorageEngineFlushTimeIntervalMs() / 1000); }, ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName()); recoverEndTrigger.start(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 825c4169efa..f0212486f52 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -256,12 +256,13 @@ public class CommonConfig { private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000; private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100; - private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2; - private int pipeMaxAllowedPinnedMemTableCount = 50; - private long pipeMaxAllowedLinkedTsFileCount = 100; + private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 10; + private int pipeMaxAllowedPinnedMemTableCount = 1000; + private long pipeMaxAllowedLinkedTsFileCount = 300; private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F; private long pipeStuckRestartIntervalSeconds = 120; private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes + private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE; private int pipeMetaReportMaxLogNumPerRound = 10; private int pipeMetaReportMaxLogIntervalRounds = 36; @@ -1061,6 +1062,10 @@ public class CommonConfig { return pipeStuckRestartMinIntervalMs; } + public long getPipeStorageEngineFlushTimeIntervalMs() { + return pipeStorageEngineFlushTimeIntervalMs; + } + public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) { this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds; } @@ -1069,6 +1074,10 @@ public class CommonConfig { this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs; } + public void setPipeStorageEngineFlushTimeIntervalMs(long pipeStorageEngineFlushTimeIntervalMs) { + this.pipeStorageEngineFlushTimeIntervalMs = pipeStorageEngineFlushTimeIntervalMs; + } + public int getPipeMetaReportMaxLogNumPerRound() { return pipeMetaReportMaxLogNumPerRound; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 3661feefa6e..bea57a45f6c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -524,6 +524,11 @@ public class CommonDescriptor { properties.getProperty( "pipe_stuck_restart_min_interval_ms", String.valueOf(config.getPipeStuckRestartMinIntervalMs())))); + config.setPipeStorageEngineFlushTimeIntervalMs( + Long.parseLong( + properties.getProperty( + "pipe_storage_engine_flush_time_interval_ms", + String.valueOf(config.getPipeStorageEngineFlushTimeIntervalMs())))); config.setPipeMetaReportMaxLogNumPerRound( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index e652bff98f9..bb2b3fd7ac0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -260,6 +260,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs(); } + public long getPipeStorageEngineFlushTimeIntervalMs() { + return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs(); + } + /////////////////////////////// Logger /////////////////////////////// public int getPipeMetaReportMaxLogNumPerRound() { @@ -457,6 +461,8 @@ public class PipeConfig { getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()); LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds()); LOGGER.info("PipeStuckRestartMinIntervalMs: {}", getPipeStuckRestartMinIntervalMs()); + LOGGER.info( + "PipeStorageEngineFlushTimeIntervalMs: {}", getPipeStorageEngineFlushTimeIntervalMs()); LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());
