This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch metric in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b85e02a323ac04a2b7ea9b03f5a08d7c02cbdedc Author: HTHou <[email protected]> AuthorDate: Sun Dec 17 22:42:09 2023 +0800 finish --- .../iotdb/db/service/metrics/WritingMetrics.java | 82 +++++++++++++++------- .../iotdb/db/storageengine/StorageEngine.java | 1 - .../db/storageengine/dataregion/DataRegion.java | 22 +++++- .../dataregion/memtable/TsFileProcessor.java | 6 ++ .../storageengine/dataregion/wal/node/WALNode.java | 1 + 5 files changed, 83 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index 725070112f1..cbe693e272b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -357,7 +357,7 @@ public class WritingMetrics implements IMetricSet { allDataRegionIds.forEach(this::createTimedFlushMemTableCounterMetrics); allDataRegionIds.forEach(this::createWalFlushMemTableCounterMetrics); allDataRegionIds.forEach(this::createActiveMemtableCounterMetrics); - allDataRegionIds.forEach(this::createActiveTimePartitionCounterMetrics); + createActiveTimePartitionCounterMetrics(); flushThreholdHistogram = MetricService.getInstance() @@ -386,8 +386,8 @@ public class WritingMetrics implements IMetricSet { removeTimedFlushMemTableCounterMetrics(dataRegionId); removeWalFlushMemTableCounterMetrics(dataRegionId); removeActiveMemtableCounterMetrics(dataRegionId); - removeActiveTimePartitionCounterMetrics(dataRegionId); }); + removeActiveTimePartitionCounterMetrics(); MetricService.getInstance().remove(MetricType.HISTOGRAM, Metric.FLUSH_THRESHOLD.toString()); MetricService.getInstance().remove(MetricType.HISTOGRAM, Metric.REJECT_THRESHOLD.toString()); MetricService.getInstance() @@ -474,7 +474,6 @@ public class WritingMetrics implements IMetricSet { .getOrCreateCounter( Metric.WAL_FLUSH_MEMTABLE_COUNT.toString(), MetricLevel.IMPORTANT, - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } @@ -484,7 +483,6 @@ public class WritingMetrics implements IMetricSet { .getOrCreateCounter( Metric.TIMED_FLUSH_MEMTABLE_COUNT.toString(), MetricLevel.IMPORTANT, - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } @@ -494,7 +492,6 @@ public class WritingMetrics implements IMetricSet { .getOrCreateCounter( Metric.SERIES_FULL_FLUSH_MEMTABLE.toString(), MetricLevel.IMPORTANT, - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } @@ -504,19 +501,13 @@ public class WritingMetrics implements IMetricSet { .getOrCreateCounter( Metric.ACTIVE_MEMTABLE_COUNT.toString(), MetricLevel.IMPORTANT, - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } - public void createActiveTimePartitionCounterMetrics(DataRegionId dataRegionId) { + public void createActiveTimePartitionCounterMetrics() { MetricService.getInstance() - .getOrCreateCounter( - Metric.ACTIVE_MEMTABLE_COUNT.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - Tag.REGION.toString(), - dataRegionId.toString()); + .getOrCreateCounter(Metric.ACTIVE_MEMTABLE_COUNT.toString(), MetricLevel.IMPORTANT); } public void removeSeriesFullFlushMemTableCounterMetrics(DataRegionId dataRegionId) { @@ -524,7 +515,6 @@ public class WritingMetrics implements IMetricSet { .remove( MetricType.COUNTER, Metric.SERIES_FULL_FLUSH_MEMTABLE.toString(), - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } @@ -534,7 +524,6 @@ public class WritingMetrics implements IMetricSet { .remove( MetricType.COUNTER, Metric.TIMED_FLUSH_MEMTABLE_COUNT.toString(), - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } @@ -544,7 +533,6 @@ public class WritingMetrics implements IMetricSet { .remove( MetricType.COUNTER, Metric.WAL_FLUSH_MEMTABLE_COUNT.toString(), - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } @@ -554,19 +542,13 @@ public class WritingMetrics implements IMetricSet { .remove( MetricType.COUNTER, Metric.ACTIVE_MEMTABLE_COUNT.toString(), - Tag.NAME.toString(), Tag.REGION.toString(), dataRegionId.toString()); } - public void removeActiveTimePartitionCounterMetrics(DataRegionId dataRegionId) { + public void removeActiveTimePartitionCounterMetrics() { MetricService.getInstance() - .remove( - MetricType.COUNTER, - Metric.ACTIVE_TIME_PARTITION_COUNT.toString(), - Tag.NAME.toString(), - Tag.REGION.toString(), - dataRegionId.toString()); + .remove(MetricType.COUNTER, Metric.ACTIVE_TIME_PARTITION_COUNT.toString()); } public void removeFlushingMemTableStatusMetrics(DataRegionId dataRegionId) { @@ -800,7 +782,57 @@ public class WritingMetrics implements IMetricSet { memtableLiveTimer.updateMillis(durationMillis); } - public void recordTimedFlushMemTableCount(DataRegionId dataRegionId, int number) {} + public void recordTimedFlushMemTableCount(String dataRegionId, int number) { + MetricService.getInstance() + .count( + number, + Metric.TIMED_FLUSH_MEMTABLE_COUNT.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Tag.NAME.toString(), + Tag.REGION.toString(), + dataRegionId); + } + + public void recordWalFlushMemTableCount(String dataRegionId, int number) { + MetricService.getInstance() + .count( + number, + Metric.WAL_FLUSH_MEMTABLE_COUNT.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Tag.NAME.toString(), + Tag.REGION.toString(), + dataRegionId); + } + + public void recordSeriesFullFlushMemTableCount(String dataRegionId, int number) { + MetricService.getInstance() + .count( + number, + Metric.SERIES_FULL_FLUSH_MEMTABLE.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Tag.NAME.toString(), + Tag.REGION.toString(), + dataRegionId); + } + + public void recordActiveMemTableCount(String dataRegionId, int number) { + MetricService.getInstance() + .count( + number, + Metric.ACTIVE_MEMTABLE_COUNT.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Tag.REGION.toString(), + dataRegionId); + } + + public void recordActiveTimePartitionCount(int number) { + MetricService.getInstance() + .count(number, Metric.ACTIVE_TIME_PARTITION_COUNT.toString(), MetricLevel.CORE); + } // endregion diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index bab14db6a66..de4f2a2200b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -648,7 +648,6 @@ public class StorageEngine implements IService { WRITING_METRICS.removeWalFlushMemTableCounterMetrics(regionId); WRITING_METRICS.removeTimedFlushMemTableCounterMetrics(regionId); WRITING_METRICS.removeSeriesFullFlushMemTableCounterMetrics(regionId); - WRITING_METRICS.removeActiveTimePartitionCounterMetrics(regionId); try { region.abortCompaction(); region.syncDeleteDataFiles(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 8b6c1d1289d..b74ef96d6f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet import org.apache.iotdb.db.service.SettleService; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; +import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache; import org.apache.iotdb.db.storageengine.buffer.ChunkCache; @@ -769,6 +770,10 @@ public class DataRegion implements IDataRegionForQuery { isSeq ? this::sequenceFlushCallback : this::unsequenceFlushCallback, isSeq, writer); + if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null + && workUnsequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null) { + WritingMetrics.getInstance().recordActiveTimePartitionCount(1); + } if (isSeq) { workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); } else { @@ -1275,7 +1280,6 @@ public class DataRegion implements IDataRegionForQuery { throws IOException, DiskSpaceInsufficientException { TsFileProcessor res = tsFileProcessorTreeMap.get(timeRangeId); - if (null == res) { // build new processor, memory control module will control the number of memtables TimePartitionManager.getInstance() @@ -1284,6 +1288,10 @@ public class DataRegion implements IDataRegionForQuery { res = newTsFileProcessor(sequence, timeRangeId); tsFileProcessorTreeMap.put(timeRangeId, res); tsFileManager.add(res.getTsFileResource(), sequence); + if (!workSequenceTsFileProcessors.containsKey(timeRangeId) + && !workSequenceTsFileProcessors.containsKey(timeRangeId)) { + WritingMetrics.getInstance().recordActiveTimePartitionCount(1); + } } return res; @@ -1431,6 +1439,10 @@ public class DataRegion implements IDataRegionForQuery { timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId()); } } + if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null + && workUnsequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null) { + WritingMetrics.getInstance().recordActiveTimePartitionCount(-1); + } return future; } @@ -1496,7 +1508,6 @@ public class DataRegion implements IDataRegionForQuery { } }); deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders()); - this.workSequenceTsFileProcessors.clear(); this.workUnsequenceTsFileProcessors.clear(); this.tsFileManager.clear(); @@ -1577,13 +1588,13 @@ public class DataRegion implements IDataRegionForQuery { } public void timedFlushSeqMemTable() { + int count = 0; writeLock("timedFlushSeqMemTable"); try { // only check sequence tsfiles' memtables List<TsFileProcessor> tsFileProcessors = new ArrayList<>(workSequenceTsFileProcessors.values()); long timeLowerBound = System.currentTimeMillis() - config.getSeqMemtableFlushInterval(); - for (TsFileProcessor tsFileProcessor : tsFileProcessors) { if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) { logger.info( @@ -1592,14 +1603,17 @@ public class DataRegion implements IDataRegionForQuery { databaseName, dataRegionId); fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + count++; } } } finally { writeUnlock(); } + WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId, count); } public void timedFlushUnseqMemTable() { + int count = 0; writeLock("timedFlushUnseqMemTable"); try { // only check unsequence tsfiles' memtables @@ -1615,11 +1629,13 @@ public class DataRegion implements IDataRegionForQuery { databaseName, dataRegionId); fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + count++; } } } finally { writeUnlock(); } + WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId, count); } /** This method will be blocked until all tsfile processors are closed. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 21e8bb3e445..bac8a076212 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -244,6 +244,8 @@ public class TsFileProcessor { long startTime = System.nanoTime(); createNewWorkingMemTable(); PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(System.nanoTime() - startTime); + WritingMetrics.getInstance() + .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); } long[] memIncrements = null; @@ -802,6 +804,8 @@ public class TsFileProcessor { "The avg series points num {} of tsfile {} reaches the threshold", workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(), tsFileResource.getTsFile().getAbsolutePath()); + WritingMetrics.getInstance() + .recordSeriesFullFlushMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); return true; } return false; @@ -1059,6 +1063,8 @@ public class TsFileProcessor { } WritingMetrics.getInstance() .recordMemTableLiveDuration(System.currentTimeMillis() - getWorkMemTableCreatedTime()); + WritingMetrics.getInstance() + .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), -1); workMemTable = null; return FlushManager.getInstance().registerTsFileProcessor(this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index c84db55510e..40a94953e03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -487,6 +487,7 @@ public class WALNode implements IWALNode { || snapshotCount >= config.getMaxWalMemTableSnapshotNum() || oldestMemTableTVListsRamCost > config.getWalMemTableSnapshotThreshold()) { flushMemTable(dataRegion, oldestTsFile, oldestMemTable); + WRITING_METRICS.recordWalFlushMemTableCount(dataRegion.getDataRegionId(), 1); WRITING_METRICS.recordMemTableRamWhenCauseFlush(identifier, oldestMemTableTVListsRamCost); } else { snapshotMemTable(dataRegion, oldestTsFile, oldestMemTableInfo);
