This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pr/11973 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b2d747e843ae8496a1a81fc18534479e18990d2d Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Feb 19 17:49:04 2024 +0800 refactor --- .../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 35 +++++++++++----------- .../iotdb/db/pipe/resource/log/PipeLogManager.java | 16 ++++++---- .../iotdb/db/pipe/resource/log/PipeLogStatus.java | 18 ++++++++--- .../resource/tsfile/PipeTsFileResourceManager.java | 23 +++++++------- .../pipe/resource/wal/PipeWALResourceManager.java | 23 ++++++++++---- .../apache/iotdb/commons/conf/CommonConfig.java | 18 +++++++++++ .../iotdb/commons/conf/CommonDescriptor.java | 10 +++++++ .../iotdb/commons/pipe/config/PipeConfig.java | 17 +++++++++++ 8 files changed, 115 insertions(+), 45 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java index 7895200a1dc..299814b57af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -71,14 +72,6 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { protected static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - public PipeTaskDataNodeAgent() { - PipeResourceManager.log() - .register( - PipeTaskDataNodeAgent.class, - PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), - PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds()); - } - ////////////////////////// Pipe Task Management Entry ////////////////////////// @Override @@ -284,15 +277,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>(); try { - boolean printThisRound = + final Optional<Logger> logger = PipeResourceManager.log() - .schedule(PipeTaskDataNodeAgent.class, pipeMetaKeeper.getPipeMetaCount()); + .schedule( + PipeTaskDataNodeAgent.class, + PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), + PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), + pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); - if (LOGGER.isInfoEnabled() && printThisRound) { - LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage()); - } + logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); } + LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); } catch (IOException e) { throw new TException(e); } @@ -319,15 +315,18 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent { final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>(); try { - boolean printThisRound = + final Optional<Logger> logger = PipeResourceManager.log() - .schedule(PipeTaskDataNodeAgent.class, pipeMetaKeeper.getPipeMetaCount()); + .schedule( + PipeTaskDataNodeAgent.class, + PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), + PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), + pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); - if (LOGGER.isInfoEnabled() && printThisRound) { - LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage()); - } + logger.ifPresent(l -> l.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage())); } + LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); } catch (IOException e) { throw new TException(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java index a032ab1c65b..c7f6e445b6a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java @@ -19,18 +19,22 @@ package org.apache.iotdb.db.pipe.resource.log; +import org.slf4j.Logger; + +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class PipeLogManager { + private final ConcurrentMap<Class<?>, PipeLogStatus> logClass2LogStatusMap = new ConcurrentHashMap<>(); - public void register(Class<?> logClass, int maxAverageScale, int maxLogInterval) { - logClass2LogStatusMap.put(logClass, new PipeLogStatus(maxAverageScale, maxLogInterval)); - } - - public boolean schedule(Class<?> logClass, int scale) { - return logClass2LogStatusMap.get(logClass).schedule(scale); + public Optional<Logger> schedule( + Class<?> logClass, int maxAverageScale, int maxLogInterval, int scale) { + return logClass2LogStatusMap + .computeIfAbsent( + logClass, k -> new PipeLogStatus(logClass, maxAverageScale, maxLogInterval)) + .schedule(scale); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java index 53cd28a79ad..67355dcc01a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java @@ -19,24 +19,34 @@ package org.apache.iotdb.db.pipe.resource.log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; class PipeLogStatus { + + private final Logger logger; + private final int maxAverageScale; private final int maxLogInterval; private final AtomicLong currentRounds = new AtomicLong(0); - PipeLogStatus(int maxAverageScale, int maxLogInterval) { + PipeLogStatus(Class<?> logClass, int maxAverageScale, int maxLogInterval) { + logger = LoggerFactory.getLogger(logClass); + this.maxAverageScale = maxAverageScale; this.maxLogInterval = maxLogInterval; } - boolean schedule(int scale) { + synchronized Optional<Logger> schedule(int scale) { if (currentRounds.incrementAndGet() >= Math.min((int) Math.ceil((double) scale / maxAverageScale), maxLogInterval)) { currentRounds.set(0); - return true; + return Optional.of(logger); } - return false; + + return Optional.empty(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index d26ba1df007..00f5d1c964a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -38,6 +38,7 @@ import java.nio.file.Path; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -56,11 +57,6 @@ public class PipeTsFileResourceManager { "PipeTsFileResourceManager#ttlCheck()", this::tryTtlCheck, Math.max(PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS / 1000, 1)); - PipeResourceManager.log() - .register( - PipeTsFileResourceManager.class, - PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(), - PipeConfig.getInstance().getPipeTsFilePinMaxLogIntervalRounds()); } private void tryTtlCheck() { @@ -84,11 +80,14 @@ public class PipeTsFileResourceManager { private void ttlCheck() { final Iterator<Map.Entry<String, PipeTsFileResource>> iterator = hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator(); - boolean printThisRound = + final Optional<Logger> logger = PipeResourceManager.log() .schedule( PipeTsFileResourceManager.class, + PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(), + PipeConfig.getInstance().getPipeTsFilePinMaxLogIntervalRounds(), hardlinkOrCopiedFileToPipeTsFileResourceMap.size()); + while (iterator.hasNext()) { final Map.Entry<String, PipeTsFileResource> entry = iterator.next(); @@ -96,12 +95,12 @@ public class PipeTsFileResourceManager { if (entry.getValue().closeIfOutOfTimeToLive()) { iterator.remove(); } else { - if (printThisRound) { - LOGGER.info( - "Pipe file (file name: {}) is still referenced {} times", - entry.getKey(), - entry.getValue().getReferenceCount()); - } + logger.ifPresent( + l -> + l.info( + "Pipe file (file name: {}) is still referenced {} times", + entry.getKey(), + entry.getValue().getReferenceCount())); } } catch (IOException e) { LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java index aa8f4e71623..7e0adbacd32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.pipe.resource.wal; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.resource.PipeResourceManager; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler; import org.slf4j.Logger; @@ -30,6 +32,7 @@ import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -61,6 +64,14 @@ public abstract class PipeWALResourceManager { private void ttlCheck() { final Iterator<Map.Entry<Long, PipeWALResource>> iterator = memtableIdToPipeWALResourceMap.entrySet().iterator(); + final Optional<Logger> logger = + PipeResourceManager.log() + .schedule( + PipeWALResourceManager.class, + PipeConfig.getInstance().getPipeWalPinMaxLogNumPerRound(), + PipeConfig.getInstance().getPipeWalPinMaxLogIntervalRounds(), + memtableIdToPipeWALResourceMap.size()); + try { while (iterator.hasNext()) { final Map.Entry<Long, PipeWALResource> entry = iterator.next(); @@ -71,11 +82,13 @@ public abstract class PipeWALResourceManager { try { if (entry.getValue().invalidateIfPossible()) { iterator.remove(); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "WAL (memtableId {}) is still referenced {} times", - entry.getKey(), - entry.getValue().getReferenceCount()); + } else { + logger.ifPresent( + l -> + l.info( + "WAL (memtableId {}) is still referenced {} times", + entry.getKey(), + entry.getValue().getReferenceCount())); } } finally { lock.unlock(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 10d331693f7..0a3d1381663 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -193,6 +193,8 @@ public class CommonConfig { private int pipeMetaReportMaxLogIntervalRounds = 36; private int pipeTsFilePinMaxLogNumPerRound = 10; private int pipeTsFilePinMaxLogIntervalRounds = 90; + private int pipeWalPinMaxLogNumPerRound = 10; + private int pipeWalPinMaxLogIntervalRounds = 90; private boolean pipeMemoryManagementEnabled = true; private long pipeMemoryAllocateRetryIntervalMs = 1000; @@ -824,6 +826,22 @@ public class CommonConfig { this.pipeTsFilePinMaxLogIntervalRounds = pipeTsFilePinMaxLogIntervalRounds; } + public int getPipeWalPinMaxLogNumPerRound() { + return pipeWalPinMaxLogNumPerRound; + } + + public void setPipeWalPinMaxLogNumPerRound(int pipeWalPinMaxLogNumPerRound) { + this.pipeWalPinMaxLogNumPerRound = pipeWalPinMaxLogNumPerRound; + } + + public int getPipeWalPinMaxLogIntervalRounds() { + return pipeWalPinMaxLogIntervalRounds; + } + + public void setPipeWalPinMaxLogIntervalRounds(int pipeWalPinMaxLogIntervalRounds) { + this.pipeWalPinMaxLogIntervalRounds = pipeWalPinMaxLogIntervalRounds; + } + public boolean getPipeMemoryManagementEnabled() { return pipeMemoryManagementEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index c145f98b882..5faf8723817 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -464,6 +464,16 @@ public class CommonDescriptor { properties.getProperty( "pipe_tsfile_pin_max_log_interval_rounds", String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds())))); + config.setPipeWalPinMaxLogNumPerRound( + Integer.parseInt( + properties.getProperty( + "pipe_wal_pin_max_log_num_per_round", + String.valueOf(config.getPipeWalPinMaxLogNumPerRound())))); + config.setPipeWalPinMaxLogIntervalRounds( + Integer.parseInt( + properties.getProperty( + "pipe_wal_pin_max_log_interval_rounds", + String.valueOf(config.getPipeWalPinMaxLogIntervalRounds())))); config.setPipeMemoryManagementEnabled( Boolean.parseBoolean( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 9d7928df310..80223a4439c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -185,6 +185,8 @@ public class PipeConfig { return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds(); } + /////////////////////////////// Logger /////////////////////////////// + public int getPipeMetaReportMaxLogNumPerRound() { return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound(); } @@ -201,6 +203,14 @@ public class PipeConfig { return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds(); } + public int getPipeWalPinMaxLogNumPerRound() { + return COMMON_CONFIG.getPipeWalPinMaxLogNumPerRound(); + } + + public int getPipeWalPinMaxLogIntervalRounds() { + return COMMON_CONFIG.getPipeWalPinMaxLogIntervalRounds(); + } + /////////////////////////////// Memory /////////////////////////////// public boolean getPipeMemoryManagementEnabled() { @@ -300,6 +310,13 @@ public class PipeConfig { LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", getPipeMaxAllowedLinkedTsFileCount()); LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds()); + LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); + LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds()); + LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", getPipeTsFilePinMaxLogNumPerRound()); + LOGGER.info("PipeTsFilePinMaxLogIntervalRounds: {}", getPipeTsFilePinMaxLogIntervalRounds()); + LOGGER.info("PipeWalPinMaxLogNumPerRound: {}", getPipeWalPinMaxLogNumPerRound()); + LOGGER.info("PipeWalPinMaxLogIntervalRounds: {}", getPipeWalPinMaxLogIntervalRounds()); + LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled()); LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries()); LOGGER.info(
