This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch fix/flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7dbc913668feffd6edbb665bebeac7a23958446f
Author: spricoder <[email protected]>
AuthorDate: Mon Aug 21 14:47:15 2023 +0800

    Fix flush
---
 .../dataregion/flush/MemTableFlushTask.java        | 23 ++++++++---
 .../iotdb/metrics/AbstractMetricService.java       | 45 ++++++++++++----------
 .../metrics/reporter/iotdb/IoTDBReporter.java      |  6 +--
 3 files changed, 46 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 9a5db18bb28..7aa325333a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -30,7 +30,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
 import 
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
@@ -42,10 +44,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * flush task to flush one memtable using a pipeline model to flush, which is 
sort memtable ->
@@ -67,6 +66,7 @@ public class MemTableFlushTask {
       (config.isEnableMemControl() && 
SystemInfo.getInstance().isEncodingFasterThanIo())
           ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
           : new LinkedBlockingQueue<>();
+  private final Map<String, Pair<Long, Long>> flushPointsCache;
 
   private String storageGroup;
   private String dataRegionId;
@@ -92,6 +92,7 @@ public class MemTableFlushTask {
     this.dataRegionId = dataRegionId;
     this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
     this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
+    this.flushPointsCache = new ConcurrentHashMap<>();
     LOGGER.debug(
         "flush task of database {} memtable is created, flushing to file {}.",
         storageGroup,
@@ -283,11 +284,23 @@ public class MemTableFlushTask {
             if (lastIndex == -1) {
               lastIndex = storageGroup.length();
             }
+            String storageGroupName = storageGroup.substring(0, lastIndex);
+            long currentTime = DateTimeUtils.currentTime();
+            long points = memTable.getTotalPointsNum();
+            Pair<Long, Long> previousPair = 
flushPointsCache.get(storageGroupName);
+            if (previousPair != null) {
+              if (previousPair.left == currentTime) {
+                points += previousPair.right;
+              } else {
+                flushPointsCache.put(storageGroupName, new Pair<>(currentTime, 
points));
+              }
+            }
             MetricService.getInstance()
                 .gaugeWithInternalReportAsync(
-                    memTable.getTotalPointsNum(),
+                    points,
                     Metric.POINTS.toString(),
                     MetricLevel.CORE,
+                    currentTime,
                     Tag.DATABASE.toString(),
                     storageGroup.substring(0, lastIndex),
                     Tag.TYPE.toString(),
diff --git 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
index 5ac5a3737f2..06e5632ae6a 100644
--- 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
+++ 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
@@ -271,9 +271,9 @@ public abstract class AbstractMetricService {
 
   /** GetOrCreateCounter with internal report. */
   public Counter getOrCreateCounterWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Counter counter = metricManager.getOrCreateCounter(metric, metricLevel, 
tags);
-    internalReporter.writeMetricToIoTDB(counter, metric, tags);
+    internalReporter.writeMetricToIoTDB(counter, metric, time, tags);
     return counter;
   }
 
@@ -287,69 +287,74 @@ public abstract class AbstractMetricService {
 
   /** GetOrCreateGauge with internal report. */
   public Gauge getOrCreateGaugeWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Gauge gauge = metricManager.getOrCreateGauge(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(gauge, metric, tags);
+    internalReporter.writeMetricToIoTDB(gauge, metric, time, tags);
     return gauge;
   }
 
   /** GetOrCreateRate with internal report. */
   public Rate getOrCreateRateWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Rate rate = metricManager.getOrCreateRate(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(rate, metric, tags);
+    internalReporter.writeMetricToIoTDB(rate, metric, time, tags);
     return rate;
   }
 
   /** GetOrCreateHistogram with internal report. */
   public Histogram getOrCreateHistogramWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Histogram histogram = metricManager.getOrCreateHistogram(metric, 
metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(histogram, metric, tags);
+    internalReporter.writeMetricToIoTDB(histogram, metric, time, tags);
     return histogram;
   }
 
   /** GetOrCreateTimer with internal report. */
   public Timer getOrCreateTimerWithInternalReport(
-      String metric, MetricLevel metricLevel, String... tags) {
+      String metric, MetricLevel metricLevel, long time, String... tags) {
     Timer timer = metricManager.getOrCreateTimer(metric, metricLevel, tags);
-    internalReporter.writeMetricToIoTDB(timer, metric, tags);
+    internalReporter.writeMetricToIoTDB(timer, metric, time, tags);
     return timer;
   }
 
   /** Count with internal report. */
   public void countWithInternalReportAsync(
-      long delta, String metric, MetricLevel metricLevel, String... tags) {
+      long delta, String metric, MetricLevel metricLevel, long time, String... 
tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.count(delta, metric, metricLevel, tags), metric, tags);
+        metricManager.count(delta, metric, metricLevel, tags), metric, time, 
tags);
   }
 
   /** Gauge value with internal report. */
   public void gaugeWithInternalReportAsync(
-      long value, String metric, MetricLevel metricLevel, String... tags) {
+      long value, String metric, MetricLevel metricLevel, long time, String... 
tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.gauge(value, metric, metricLevel, tags), metric, tags);
+        metricManager.gauge(value, metric, metricLevel, tags), metric, time, 
tags);
   }
 
   /** Rate with internal report. */
   public void rateWithInternalReportAsync(
-      long value, String metric, MetricLevel metricLevel, String... tags) {
+      long value, String metric, MetricLevel metricLevel, long time, String... 
tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.rate(value, metric, metricLevel, tags), metric, tags);
+        metricManager.rate(value, metric, metricLevel, tags), metric, time, 
tags);
   }
 
   /** Histogram with internal report. */
   public void histogramWithInternalReportAsync(
-      long value, String metric, MetricLevel metricLevel, String... tags) {
+      long value, String metric, MetricLevel metricLevel, long time, String... 
tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.histogram(value, metric, metricLevel, tags), metric, 
tags);
+        metricManager.histogram(value, metric, metricLevel, tags), metric, 
time, tags);
   }
 
   /** Timer with internal report. */
   public void timerWithInternalReportAsync(
-      long delta, TimeUnit timeUnit, String metric, MetricLevel metricLevel, 
String... tags) {
+      long delta,
+      TimeUnit timeUnit,
+      String metric,
+      MetricLevel metricLevel,
+      long time,
+      String... tags) {
     internalReporter.writeMetricToIoTDB(
-        metricManager.timer(delta, timeUnit, metric, metricLevel, tags), 
metric, tags);
+        metricManager.timer(delta, timeUnit, metric, metricLevel, tags), 
metric, time, tags);
   }
 
   public List<Pair<String, String[]>> getAllMetricKeys() {
diff --git 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
index 2cc67f13a31..db1e915c8e9 100644
--- 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
+++ 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/reporter/iotdb/IoTDBReporter.java
@@ -36,14 +36,14 @@ public abstract class IoTDBReporter implements Reporter {
    *
    * @param metric the target metric
    * @param name the name of metric
+   * @param time the target time of metric
    * @param tags the tags of metric
    */
-  public void writeMetricToIoTDB(IMetric metric, String name, String... tags) {
+  public void writeMetricToIoTDB(IMetric metric, String name, long time, 
String... tags) {
     if (!(metric instanceof DoNothingMetric)) {
       Map<String, Object> values = new HashMap<>();
       metric.constructValueMap(values);
-      writeMetricToIoTDB(
-          values, IoTDBMetricsUtils.generatePath(name, tags), 
System.currentTimeMillis());
+      writeMetricToIoTDB(values, IoTDBMetricsUtils.generatePath(name, tags), 
time);
     }
   }
 

Reply via email to