This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-restart-stuck in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b75100175804b553897475415d448a46711c876b Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Jan 23 13:01:29 2024 +0800 refactor --- .../db/pipe/agent/runtime/PipeAgentLauncher.java | 6 - .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 5 + .../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 133 ++++++++++++++------- .../pipe/extractor/IoTDBDataRegionExtractor.java | 11 ++ .../PipeRealtimeDataRegionHybridExtractor.java | 9 +- .../subtask/connector/PipeConnectorSubtask.java | 16 +-- .../apache/iotdb/commons/conf/CommonConfig.java | 43 +++---- .../iotdb/commons/conf/CommonDescriptor.java | 25 ++-- .../iotdb/commons/pipe/config/PipeConfig.java | 27 ++--- 9 files changed, 158 insertions(+), 117 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java index b3b2995c20b..b6b6c83763c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.agent.runtime; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.StartupException; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager; import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager; @@ -172,11 +171,6 @@ class PipeAgentLauncher { return pipeMeta; }) .collect(Collectors.toList())); - PipeAgent.runtime() - .registerPeriodicalJob( - "PipeTaskAgent#RestartAllStuckPipes", - PipeAgent.task()::restartAllStuckPipes, - PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds()); } catch (Exception e) { LOGGER.info( "Failed to get pipe task meta from config node. Ignore the exception, " diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 2414613ad96..75997e483e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -73,6 +73,11 @@ public class PipeRuntimeAgent implements IService { public synchronized void start() throws StartupException { PipeConfig.getInstance().printAllConfigs(); PipeAgentLauncher.launchPipeTaskAgent(); + + registerPeriodicalJob( + "PipeTaskAgent#restartAllStuckPipes", + PipeAgent.task()::restartAllStuckPipes, + PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds()); pipePeriodicalJobExecutor.start(); isShutdown.set(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java index 3884588de2c..4935872e507 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNod import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics; import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics; import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics; +import org.apache.iotdb.db.pipe.resource.PipeResourceManager; import org.apache.iotdb.db.pipe.task.PipeDataNodeTask; import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder; import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder; @@ -46,6 +47,7 @@ import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask; import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager; import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; @@ -324,59 +326,106 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { ///////////////////////// Restart Logic ///////////////////////// public void restartAllStuckPipes() { - if (!tryWriteLockWithTimeOut(0)) { + if (!tryWriteLockWithTimeOut(5)) { return; } try { - Map<String, IoTDBDataRegionExtractor> pipeName2ExtractorMap = - PipeExtractorMetrics.getInstance().getPipeName2ExtractorMap(); - Map<String, PipeProcessorSubtask> pipeName2ProcessorSubtaskMap = - PipeProcessorMetrics.getInstance().getPipeName2ProcessorSubtaskMap(); - Map<String, PipeConnectorSubtask> attributeSortedStr2PipeConnectorSubtaskMap = - PipeConnectorMetrics.getInstance().getAttributeSortedStr2PipeConnectorSubtaskMap(); - - Set<PipeMeta> restartPipes = new HashSet<>(); - - for (PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { - if (!pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) { - continue; - } - String pipeName = pipeMeta.getStaticMeta().getPipeName(); - IoTDBDataRegionExtractor extractor = pipeName2ExtractorMap.get(pipeName); - PipeProcessorSubtask processorSubtask = pipeName2ProcessorSubtaskMap.get(pipeName); - PipeConnectorSubtask connectorSubtask = - attributeSortedStr2PipeConnectorSubtaskMap.get( - PipeConnectorSubtaskManager.getAttributeSortedString( - pipeMeta.getStaticMeta().getConnectorParameters())); - - if (Objects.isNull(extractor) - || Objects.isNull(processorSubtask) - || Objects.isNull(connectorSubtask)) { - continue; - } + restartAllStuckPipesInternal(); + } finally { + releaseWriteLock(); + } + } - boolean tsFileCountExceeded = - extractor.getRealtimeTsFileInsertionEventCount() - + processorSubtask.getTsFileInsertionEventCount() - + connectorSubtask.getTsFileInsertionEventCount() - > PipeConfig.getInstance().getPipeMaxAllowedTsFileCount(); - boolean connectorStuck = - System.currentTimeMillis() - connectorSubtask.getLastSendExecutionTime() - > PipeConfig.getInstance().getPipeMaxAllowedConnectorStuckTime(); - if (tsFileCountExceeded || connectorStuck) { - restartPipes.add(pipeMeta); - } + private void restartAllStuckPipesInternal() { + final Map<String, IoTDBDataRegionExtractor> pipeName2ExtractorMap = + PipeExtractorMetrics.getInstance().getPipeName2ExtractorMap(); + final Map<String, PipeProcessorSubtask> pipeName2ProcessorSubtaskMap = + PipeProcessorMetrics.getInstance().getPipeName2ProcessorSubtaskMap(); + final Map<String, PipeConnectorSubtask> attributeSortedStr2PipeConnectorSubtaskMap = + PipeConnectorMetrics.getInstance().getAttributeSortedStr2PipeConnectorSubtaskMap(); + + final Set<PipeMeta> stuckPipes = new HashSet<>(); + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + if (PipeStatus.RUNNING.equals(pipeMeta.getRuntimeMeta().getStatus().get())) { + continue; } - restartPipes.parallelStream().forEach(this::restartPipeInternal); + final String pipeName = pipeMeta.getStaticMeta().getPipeName(); + final IoTDBDataRegionExtractor extractor = pipeName2ExtractorMap.get(pipeName); + if (Objects.isNull(extractor) + || !extractor.isStreamMode() + || !extractor.hasConsumedAllHistoricalTsFiles()) { + continue; + } + final PipeProcessorSubtask processorSubtask = pipeName2ProcessorSubtaskMap.get(pipeName); + if (Objects.isNull(processorSubtask)) { + continue; + } + final PipeConnectorSubtask connectorSubtask = + attributeSortedStr2PipeConnectorSubtaskMap.get( + PipeConnectorSubtaskManager.getAttributeSortedString( + pipeMeta.getStaticMeta().getConnectorParameters())); + if (Objects.isNull(connectorSubtask)) { + continue; + } - } finally { - releaseWriteLock(); + if (isPipeLinkedTsFileEventCountExceededLimit(extractor, processorSubtask, connectorSubtask) + || mayLinkedTsFileCountReachDangerousThreshold() + || mayMemTablePinnedCountReachDangerousThreshold() + || mayWalSizeReachThrottleThreshold()) { + LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta()); + stuckPipes.add(pipeMeta); + } } + + // Restart all stuck pipes + stuckPipes.parallelStream().forEach(this::restartStuckPipe); + } + + private boolean isPipeLinkedTsFileEventCountExceededLimit( + IoTDBDataRegionExtractor extractor, + PipeProcessorSubtask processorSubtask, + PipeConnectorSubtask connectorSubtask) { + final int tabletInsertionEventCount = + extractor.getTabletInsertionEventCount() + + processorSubtask.getTabletInsertionEventCount() + + connectorSubtask.getTabletInsertionEventCount(); + final int tsFileInsertionEventCount = + extractor.getRealtimeTsFileInsertionEventCount() + + processorSubtask.getTsFileInsertionEventCount() + + connectorSubtask.getTsFileInsertionEventCount(); + return tabletInsertionEventCount > 0 + && tsFileInsertionEventCount + > PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + } + + private boolean mayLinkedTsFileCountReachDangerousThreshold() { + return PipeResourceManager.tsfile().getLinkedTsfileCount() + >= 2 * PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + } + + private boolean mayMemTablePinnedCountReachDangerousThreshold() { + return PipeResourceManager.wal().getPinnedWalCount() + >= 10 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount(); + } + + private boolean mayWalSizeReachThrottleThreshold() { + return 3 * WALManager.getInstance().getTotalDiskUsage() + > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold(); } - private void restartPipeInternal(PipeMeta pipeMeta) { + private void restartStuckPipe(PipeMeta pipeMeta) { + LOGGER.warn( + "Pipe {} (creation time = {}) will be restarted because of stuck.", + pipeMeta.getStaticMeta().getPipeName(), + DateTimeUtils.convertLongToDate(pipeMeta.getStaticMeta().getCreationTime(), "ms")); + final long startTime = System.currentTimeMillis(); handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName()); handleSinglePipeMetaChangesInternal(pipeMeta); + LOGGER.warn( + "Pipe {} (creation time = {}) was restarted because of stuck, time cost: {} ms.", + pipeMeta.getStaticMeta().getPipeName(), + DateTimeUtils.convertLongToDate(pipeMeta.getStaticMeta().getCreationTime(), "ms"), + System.currentTimeMillis() - startTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index 58d274b82b1..c13f4071a67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -372,6 +372,17 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { } } + //////////////////////////// APIs provided for detecting stuck //////////////////////////// + + public boolean isStreamMode() { + return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor + || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor; + } + + public boolean hasConsumedAllHistoricalTsFiles() { + return historicalExtractor.hasConsumedAll(); + } + //////////////////////////// APIs provided for metric framework //////////////////////////// public String getTaskID() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index 91b786bfffa..568c96b5263 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -224,10 +224,12 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio // the write operation will be throttled, so we should not extract any more tablet events. // 3. The number of pinned memtables has reached the dangerous threshold. // 4. The number of tsfile events in the pending queue has exceeded the limit. + // 5. The number of linked tsfiles has reached the dangerous threshold. return !isStartedToSupply || mayWalSizeReachThrottleThreshold() || mayMemTablePinnedCountReachDangerousThreshold() - || isTsFileEventCountInQueueExceededLimit(); + || isTsFileEventCountInQueueExceededLimit() + || mayTsFileLinkedCountReachDangerousThreshold(); } private boolean mayWalSizeReachThrottleThreshold() { @@ -247,6 +249,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); } + private boolean mayTsFileLinkedCountReachDangerousThreshold() { + return PipeResourceManager.tsfile().getLinkedTsfileCount() + >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + } + public void informProcessorEventCollectorQueueTsFileSize(int queueSize) { processorEventCollectorQueueTsFileSize.set(queueSize); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index e0d77389ec1..967ca2d6b86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -77,7 +77,6 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask { private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS = PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds(); private long lastHeartbeatEventInjectTime = System.currentTimeMillis(); - private long lastSendExecutionTime = System.currentTimeMillis(); public PipeConnectorSubtask( String taskID, @@ -124,7 +123,6 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask { final Event event = lastEvent != null ? lastEvent : inputPendingQueue.waitedPoll(); // Record this event for retrying on connection failure or other exceptions setLastEvent(event); - lastSendExecutionTime = System.currentTimeMillis(); try { if (event == null) { @@ -357,27 +355,21 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask { return connectorIndex; } - public Integer getTsFileInsertionEventCount() { + public int getTsFileInsertionEventCount() { return inputPendingQueue.getTsFileInsertionEventCount(); } - public Integer getTabletInsertionEventCount() { + public int getTabletInsertionEventCount() { return inputPendingQueue.getTabletInsertionEventCount(); } - public Integer getPipeHeartbeatEventCount() { + public int getPipeHeartbeatEventCount() { return inputPendingQueue.getPipeHeartbeatEventCount(); } - public Integer getAsyncConnectorRetryEventQueueSize() { + public int getAsyncConnectorRetryEventQueueSize() { return outputPipeConnector instanceof IoTDBThriftAsyncConnector ? ((IoTDBThriftAsyncConnector) outputPipeConnector).getRetryEventQueueSize() : 0; } - - //////////////////////////// APIs provided for restart metric //////////////////////////// - - public long getLastSendExecutionTime() { - return lastSendExecutionTime; - } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 754411cbd11..002183372c3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -185,6 +185,7 @@ public class CommonConfig { private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2; private int pipeMaxAllowedPinnedMemTableCount = 50; + private long pipeMaxAllowedLinkedTsFileCount = 100; private boolean pipeMemoryManagementEnabled = true; private long pipeMemoryAllocateRetryIntervalMs = 1000; @@ -195,8 +196,6 @@ public class CommonConfig { private float pipeLeaderCacheMemoryUsagePercentage = 0.1F; private long pipeStuckRestartIntervalSeconds = 60; - private long pipeMaxAllowedTsFileCount = 1000; - private long pipeMaxAllowedConnectorStuckTime = (long) 15 * 60 * 1000; /** Whether to use persistent schema mode. */ private String schemaEngineMode = "Memory"; @@ -758,6 +757,22 @@ public class CommonConfig { this.pipeMaxAllowedPinnedMemTableCount = pipeMaxAllowedPinnedMemTableCount; } + public long getPipeMaxAllowedLinkedTsFileCount() { + return pipeMaxAllowedLinkedTsFileCount; + } + + public void setPipeMaxAllowedLinkedTsFileCount(long pipeMaxAllowedLinkedTsFileCount) { + this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount; + } + + public long getPipeStuckRestartIntervalSeconds() { + return pipeStuckRestartIntervalSeconds; + } + + public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) { + this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds; + } + public boolean getPipeMemoryManagementEnabled() { return pipeMemoryManagementEnabled; } @@ -816,30 +831,6 @@ public class CommonConfig { this.pipeLeaderCacheMemoryUsagePercentage = pipeLeaderCacheMemoryUsagePercentage; } - public long getPipeStuckRestartIntervalSeconds() { - return pipeStuckRestartIntervalSeconds; - } - - public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) { - this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds; - } - - public long getPipeMaxAllowedTsFileCount() { - return pipeMaxAllowedTsFileCount; - } - - public void setPipeMaxAllowedTsFileCount(long pipeMaxAllowedTsFileCount) { - this.pipeMaxAllowedTsFileCount = pipeMaxAllowedTsFileCount; - } - - public long getPipeMaxAllowedConnectorStuckTime() { - return pipeMaxAllowedConnectorStuckTime; - } - - public void setPipeMaxAllowedConnectorStuckTime(long pipeMaxAllowedConnectorStuckTime) { - this.pipeMaxAllowedConnectorStuckTime = pipeMaxAllowedConnectorStuckTime; - } - public String getSchemaEngineMode() { return schemaEngineMode; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index b3c5a425de4..43ec8f911b4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -427,6 +427,16 @@ public class CommonDescriptor { properties.getProperty( "pipe_max_allowed_pinned_memtable_count", String.valueOf(config.getPipeMaxAllowedPinnedMemTableCount())))); + config.setPipeMaxAllowedLinkedTsFileCount( + Long.parseLong( + properties.getProperty( + "pipe_max_allowed_linked_tsfile_count", + String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount())))); + config.setPipeStuckRestartIntervalSeconds( + Long.parseLong( + properties.getProperty( + "pipe_stuck_restart_interval_seconds", + String.valueOf(config.getPipeStuckRestartIntervalSeconds())))); config.setPipeMemoryManagementEnabled( Boolean.parseBoolean( @@ -463,21 +473,6 @@ public class CommonDescriptor { properties.getProperty( "pipe_leader_cache_memory_usage_percentage", String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage())))); - config.setPipeStuckRestartIntervalSeconds( - Long.parseLong( - properties.getProperty( - "pipe_stuck_restart_interval_seconds", - String.valueOf(config.getPipeStuckRestartIntervalSeconds())))); - config.setPipeMaxAllowedTsFileCount( - Long.parseLong( - properties.getProperty( - "pipe_max_allowed_tsfile_count", - String.valueOf(config.getPipeMaxAllowedTsFileCount())))); - config.setPipeMaxAllowedConnectorStuckTime( - Long.parseLong( - properties.getProperty( - "pipe_max_allowed_connector_stuck_time", - String.valueOf(config.getPipeMaxAllowedConnectorStuckTime())))); } public void loadGlobalConfig(TGlobalConfig globalConfig) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 2691f88ecbe..74a6711a96b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -127,18 +127,6 @@ public class PipeConfig { return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage(); } - public long getPipeStuckRestartIntervalSeconds() { - return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds(); - } - - public long getPipeMaxAllowedTsFileCount() { - return COMMON_CONFIG.getPipeMaxAllowedTsFileCount(); - } - - public long getPipeMaxAllowedConnectorStuckTime() { - return COMMON_CONFIG.getPipeMaxAllowedConnectorStuckTime(); - } - /////////////////////////////// Meta Consistency /////////////////////////////// public boolean isSeperatedPipeHeartbeatEnabled() { @@ -185,6 +173,14 @@ public class PipeConfig { return COMMON_CONFIG.getPipeMaxAllowedPinnedMemTableCount(); } + public long getPipeMaxAllowedLinkedTsFileCount() { + return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount(); + } + + public long getPipeStuckRestartIntervalSeconds() { + return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds(); + } + /////////////////////////////// Memory /////////////////////////////// public boolean getPipeMemoryManagementEnabled() { @@ -253,6 +249,8 @@ public class PipeConfig { LOGGER.info( "PipeConnectorRPCThriftCompressionEnabled: {}", isPipeConnectorRPCThriftCompressionEnabled()); + LOGGER.info( + "PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage()); LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", getPipeAsyncConnectorSelectorNumber()); LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", getPipeAsyncConnectorMaxClientNumber()); @@ -276,6 +274,8 @@ public class PipeConfig { "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}", getPipeMaxAllowedPendingTsFileEpochPerDataRegion()); LOGGER.info("PipeMaxAllowedPinnedMemTableCount: {}", getPipeMaxAllowedPinnedMemTableCount()); + LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", getPipeMaxAllowedLinkedTsFileCount()); + LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds()); LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled()); LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries()); @@ -285,9 +285,6 @@ public class PipeConfig { LOGGER.info( "PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}", getPipeMemoryAllocateForTsFileSequenceReaderInBytes()); - - LOGGER.info( - "PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage()); } /////////////////////////////// Singleton ///////////////////////////////
