This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch fix/flush in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7dbc913668feffd6edbb665bebeac7a23958446f Author: spricoder <[email protected]> AuthorDate: Mon Aug 21 14:47:15 2023 +0800 Fix flush --- .../dataregion/flush/MemTableFlushTask.java | 23 ++++++++--- .../iotdb/metrics/AbstractMetricService.java | 45 ++++++++++++---------- .../metrics/reporter/iotdb/IoTDBReporter.java | 6 +-- 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 9a5db18bb28..7aa325333a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -30,7 +30,9 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; +import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; @@ -42,10 +44,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * flush task to flush one memtable using a pipeline model to flush, which is sort memtable -> @@ -67,6 +66,7 @@ public class MemTableFlushTask { (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing()) : new LinkedBlockingQueue<>(); + private final Map<String, Pair<Long, Long>> flushPointsCache; private String storageGroup; private String dataRegionId; @@ -92,6 +92,7 @@ public class MemTableFlushTask { this.dataRegionId = dataRegionId; this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask); this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask); + this.flushPointsCache = new ConcurrentHashMap<>(); LOGGER.debug( "flush task of database {} memtable is created, flushing to file {}.", storageGroup, @@ -283,11 +284,23 @@ public class MemTableFlushTask { if (lastIndex == -1) { lastIndex = storageGroup.length(); } + String storageGroupName = storageGroup.substring(0, lastIndex); + long currentTime = DateTimeUtils.currentTime(); + long points = memTable.getTotalPointsNum(); + Pair<Long, Long> previousPair = flushPointsCache.get(storageGroupName); + if (previousPair != null) { + if (previousPair.left == currentTime) { + points += previousPair.right; + } else { + flushPointsCache.put(storageGroupName, new Pair<>(currentTime, points)); + } + } MetricService.getInstance() .gaugeWithInternalReportAsync( - memTable.getTotalPointsNum(), + points, Metric.POINTS.toString(), MetricLevel.CORE, + currentTime, Tag.DATABASE.toString(), storageGroup.substring(0, lastIndex), Tag.TYPE.toString(), diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java index 5ac5a3737f2..06e5632ae6a 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java @@ -271,9 +271,9 @@ public abstract class AbstractMetricService { /** GetOrCreateCounter with internal report. */ public Counter getOrCreateCounterWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Counter counter = metricManager.getOrCreateCounter(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(counter, metric, tags); + internalReporter.writeMetricToIoTDB(counter, metric, time, tags); return counter; } @@ -287,69 +287,74 @@ public abstract class AbstractMetricService { /** GetOrCreateGauge with internal report. */ public Gauge getOrCreateGaugeWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Gauge gauge = metricManager.getOrCreateGauge(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(gauge, metric, tags); + internalReporter.writeMetricToIoTDB(gauge, metric, time, tags); return gauge; } /** GetOrCreateRate with internal report. */ public Rate getOrCreateRateWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Rate rate = metricManager.getOrCreateRate(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(rate, metric, tags); + internalReporter.writeMetricToIoTDB(rate, metric, time, tags); return rate; } /** GetOrCreateHistogram with internal report. */ public Histogram getOrCreateHistogramWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Histogram histogram = metricManager.getOrCreateHistogram(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(histogram, metric, tags); + internalReporter.writeMetricToIoTDB(histogram, metric, time, tags); return histogram; } /** GetOrCreateTimer with internal report. */ public Timer getOrCreateTimerWithInternalReport( - String metric, MetricLevel metricLevel, String... tags) { + String metric, MetricLevel metricLevel, long time, String... tags) { Timer timer = metricManager.getOrCreateTimer(metric, metricLevel, tags); - internalReporter.writeMetricToIoTDB(timer, metric, tags); + internalReporter.writeMetricToIoTDB(timer, metric, time, tags); return timer; } /** Count with internal report. */ public void countWithInternalReportAsync( - long delta, String metric, MetricLevel metricLevel, String... tags) { + long delta, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.count(delta, metric, metricLevel, tags), metric, tags); + metricManager.count(delta, metric, metricLevel, tags), metric, time, tags); } /** Gauge value with internal report. */ public void gaugeWithInternalReportAsync( - long value, String metric, MetricLevel metricLevel, String... tags) { + long value, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.gauge(value, metric, metricLevel, tags), metric, tags); + metricManager.gauge(value, metric, metricLevel, tags), metric, time, tags); } /** Rate with internal report. */ public void rateWithInternalReportAsync( - long value, String metric, MetricLevel metricLevel, String... tags) { + long value, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.rate(value, metric, metricLevel, tags), metric, tags); + metricManager.rate(value, metric, metricLevel, tags), metric, time, tags); } /** Histogram with internal report. */ public void histogramWithInternalReportAsync( - long value, String metric, MetricLevel metricLevel, String... tags) { + long value, String metric, MetricLevel metricLevel, long time, String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.histogram(value, metric, metricLevel, tags), metric, tags); + metricManager.histogram(value, metric, metricLevel, tags), metric, time, tags); } /** Timer with internal report. */ public void timerWithInternalReportAsync( - long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel, String... tags) { + long delta, + TimeUnit timeUnit, + String metric, + MetricLevel metricLevel, + long time, + String... tags) { internalReporter.writeMetricToIoTDB( - metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, tags); + metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, time, tags); } public List<Pair<String, String[]>> getAllMetricKeys() { diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java index 2cc67f13a31..db1e915c8e9 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java @@ -36,14 +36,14 @@ public abstract class IoTDBReporter implements Reporter { * * @param metric the target metric * @param name the name of metric + * @param time the target time of metric * @param tags the tags of metric */ - public void writeMetricToIoTDB(IMetric metric, String name, String... tags) { + public void writeMetricToIoTDB(IMetric metric, String name, long time, String... tags) { if (!(metric instanceof DoNothingMetric)) { Map<String, Object> values = new HashMap<>(); metric.constructValueMap(values); - writeMetricToIoTDB( - values, IoTDBMetricsUtils.generatePath(name, tags), System.currentTimeMillis()); + writeMetricToIoTDB(values, IoTDBMetricsUtils.generatePath(name, tags), time); } }
