This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch fix/flush_1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8e05b6ceaeeb9610245c7d11587c441c9dcf57cd Author: ZhangHongYin <[email protected]> AuthorDate: Tue Aug 22 19:18:30 2023 +0800 [Metric] Fix flush point statistics (#10915) (cherry picked from commit 1dce787c39e14973918ee390c470b901b68be73e) --- .../dataregion/flush/MemTableFlushTask.java | 58 +++++++++++++++------- .../iotdb/metrics/AbstractMetricService.java | 45 +++++++++-------- .../metrics/reporter/iotdb/IoTDBReporter.java | 6 +-- 3 files changed, 69 insertions(+), 40 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 9a5db18bb28..750fb51e0d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; +import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; @@ -42,6 +43,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -58,6 +60,8 @@ public class MemTableFlushTask { FlushSubTaskPoolManager.getInstance(); private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance(); private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + /* storage group name -> last time */ + private static final Map<String, Long> flushPointsCache = new ConcurrentHashMap<>(); private final Future<?> encodingTaskFuture; private final Future<?> ioTaskFuture; private RestorableTsFileIOWriter writer; @@ -278,23 +282,7 @@ public class MemTableFlushTask { Thread.currentThread().interrupt(); } - if (!storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) { - int lastIndex = storageGroup.lastIndexOf("-"); - if (lastIndex == -1) { - lastIndex = storageGroup.length(); - } - MetricService.getInstance() - .gaugeWithInternalReportAsync( - memTable.getTotalPointsNum(), - Metric.POINTS.toString(), - MetricLevel.CORE, - Tag.DATABASE.toString(), - storageGroup.substring(0, lastIndex), - Tag.TYPE.toString(), - "flush", - Tag.REGION.toString(), - dataRegionId); - } + recordFlushPointsMetric(); LOGGER.info( "Database {}, flushing memtable {} into disk: Encoding data cost " + "{} ms.", @@ -305,6 +293,42 @@ public class MemTableFlushTask { } }; + private void recordFlushPointsMetric() { + if (storageGroup.startsWith(SchemaConstant.SYSTEM_DATABASE)) { + return; + } + int lastIndex = storageGroup.lastIndexOf("-"); + if (lastIndex == -1) { + lastIndex = storageGroup.length(); + } + String storageGroupName = storageGroup.substring(0, lastIndex); + long currentTime = DateTimeUtils.currentTime(); + // compute the flush points + long writeTime = + flushPointsCache.compute( + storageGroupName, + (storageGroup, lastTime) -> { + if (lastTime == null || lastTime != currentTime) { + return currentTime; + } else { + return currentTime + 1; + } + }); + // record the flush points + MetricService.getInstance() + .gaugeWithInternalReportAsync( + memTable.getTotalPointsNum(), + Metric.POINTS.toString(), + MetricLevel.CORE, + writeTime, + Tag.DATABASE.toString(), + storageGroup.substring(0, lastIndex), + Tag.TYPE.toString(), + "flush", + Tag.REGION.toString(), + dataRegionId); + } + /** io task (third task of pipeline) */ @SuppressWarnings("squid:S135") private Runnable ioTask = diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java index 5ac5a3737f2..06e5632ae6a 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java @@ -271,9 +271,9 @@ public abstract class AbstractMetricService { /** GetOrCreateCounter with internal report. */ public Counter getOrCreateCounterWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Counter counter = metricManager.getOrCreateCounter(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(counter, metric, tags); + internalReporter.writeMetricToIoTDB(counter, metric, time, tags); return counter; } @@ -287,69 +287,74 @@ public abstract class AbstractMetricService { /** GetOrCreateGauge with internal report. */ public Gauge getOrCreateGaugeWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Gauge gauge = metricManager.getOrCreateGauge(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(gauge, metric, tags); + internalReporter.writeMetricToIoTDB(gauge, metric, time, tags); return gauge; } /** GetOrCreateRate with internal report. */ public Rate getOrCreateRateWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Rate rate = metricManager.getOrCreateRate(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(rate, metric, tags); + internalReporter.writeMetricToIoTDB(rate, metric, time, tags); return rate; } /** GetOrCreateHistogram with internal report. */ public Histogram getOrCreateHistogramWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Histogram histogram = metricManager.getOrCreateHistogram(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(histogram, metric, tags); + internalReporter.writeMetricToIoTDB(histogram, metric, time, tags); return histogram; } /** GetOrCreateTimer with internal report. */ public Timer getOrCreateTimerWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Timer timer = metricManager.getOrCreateTimer(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(timer, metric, tags); + internalReporter.writeMetricToIoTDB(timer, metric, time, tags); return timer; } /** Count with internal report. */ public void countWithInternalReportAsync( - long delta, String metric, MetricLevel metricLevel, String... tags) { + long delta, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.count(delta, metric, metricLevel, tags), metric, tags); + metricManager.count(delta, metric, metricLevel, tags), metric, time, tags); } /** Gauge value with internal report. */ public void gaugeWithInternalReportAsync( - long value, String metric, MetricLevel metricLevel, String... tags) { + long value, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.gauge(value, metric, metricLevel, tags), metric, tags); + metricManager.gauge(value, metric, metricLevel, tags), metric, time, tags); } /** Rate with internal report. */ public void rateWithInternalReportAsync( - long value, String metric, MetricLevel metricLevel, String... tags) { + long value, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.rate(value, metric, metricLevel, tags), metric, tags); + metricManager.rate(value, metric, metricLevel, tags), metric, time, tags); } /** Histogram with internal report. */ public void histogramWithInternalReportAsync( - long value, String metric, MetricLevel metricLevel, String... tags) { + long value, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.histogram(value, metric, metricLevel, tags), metric, tags); + metricManager.histogram(value, metric, metricLevel, tags), metric, time, tags); } /** Timer with internal report. */ public void timerWithInternalReportAsync( - long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel, String... tags) { + long delta, + TimeUnit timeUnit, + String metric, + MetricLevel metricLevel, + long time, + String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, tags); + metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, time, tags); } public List<Pair<String, String[]>> getAllMetricKeys() { diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java index 2cc67f13a31..db1e915c8e9 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java @@ -36,14 +36,14 @@ public abstract class IoTDBReporter implements Reporter { * * @param metric the target metric * @param name the name of metric + * @param time the target time of metric * @param tags the tags of metric */ - public void writeMetricToIoTDB(IMetric metric, String name, String... tags) { + public void writeMetricToIoTDB(IMetric metric, String name, long time, String... tags) { if (!(metric instanceof DoNothingMetric)) { Map<String, Object> values = new HashMap<>(); metric.constructValueMap(values); - writeMetricToIoTDB( - values, IoTDBMetricsUtils.generatePath(name, tags), System.currentTimeMillis()); + writeMetricToIoTDB(values, IoTDBMetricsUtils.generatePath(name, tags), time); } }
