This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch fix/flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fix/flush by this push:
     new 282184058ef Fix concurrent problem
282184058ef is described below

commit 282184058ef0c242bd028ec29bb47f47d3fb29d9
Author: spricoder <[email protected]>
AuthorDate: Tue Aug 22 00:28:26 2023 +0800

    Fix concurrent problem
---
 .../dataregion/flush/MemTableFlushTask.java        | 75 +++++++++++++---------
 1 file changed, 44 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 141c18d7a04..5f1cea00c48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -61,6 +61,8 @@ public class MemTableFlushTask {
       FlushSubTaskPoolManager.getInstance();
   private static final WritingMetrics WRITING_METRICS = 
WritingMetrics.getInstance();
   private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  /* storage group name -> <latest time, points>*/
+  private static final Map<String, Pair<Long, Long>> flushPointsCache = new 
ConcurrentHashMap<>();
   private final Future<?> encodingTaskFuture;
   private final Future<?> ioTaskFuture;
   private RestorableTsFileIOWriter writer;
@@ -70,7 +72,6 @@ public class MemTableFlushTask {
       (config.isEnableMemControl() && 
SystemInfo.getInstance().isEncodingFasterThanIo())
           ? new LinkedBlockingQueue<>(config.getIoTaskQueueSizeForFlushing())
           : new LinkedBlockingQueue<>();
-  private final Map<String, Pair<Long, Long>> flushPointsCache;
 
   private String storageGroup;
   private String dataRegionId;
@@ -96,7 +97,6 @@ public class MemTableFlushTask {
     this.dataRegionId = dataRegionId;
     this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
     this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
-    this.flushPointsCache = new ConcurrentHashMap<>();
     LOGGER.debug(
         "flush task of database {} memtable is created, flushing to file {}.",
         storageGroup,
@@ -283,35 +283,7 @@ public class MemTableFlushTask {
             Thread.currentThread().interrupt();
           }
 
-          if (!storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
-            int lastIndex = storageGroup.lastIndexOf("-");
-            if (lastIndex == -1) {
-              lastIndex = storageGroup.length();
-            }
-            String storageGroupName = storageGroup.substring(0, lastIndex);
-            long currentTime = DateTimeUtils.currentTime();
-            long points = memTable.getTotalPointsNum();
-            Pair<Long, Long> previousPair = 
flushPointsCache.get(storageGroupName);
-            if (previousPair != null) {
-              if (previousPair.left == currentTime) {
-                points += previousPair.right;
-              } else {
-                flushPointsCache.put(storageGroupName, new Pair<>(currentTime, 
points));
-              }
-            }
-            MetricService.getInstance()
-                .gaugeWithInternalReportAsync(
-                    points,
-                    Metric.POINTS.toString(),
-                    MetricLevel.CORE,
-                    currentTime,
-                    Tag.DATABASE.toString(),
-                    storageGroup.substring(0, lastIndex),
-                    Tag.TYPE.toString(),
-                    "flush",
-                    Tag.REGION.toString(),
-                    dataRegionId);
-          }
+          recordFlushPointsMetric();
 
           LOGGER.info(
               "Database {}, flushing memtable {} into disk: Encoding data cost 
" + "{} ms.",
@@ -322,6 +294,47 @@ public class MemTableFlushTask {
         }
       };
 
+  private void recordFlushPointsMetric() {
+    if (storageGroup.startsWith(IoTDBConfig.SYSTEM_DATABASE)) {
+      return;
+    }
+    int lastIndex = storageGroup.lastIndexOf("-");
+    if (lastIndex == -1) {
+      lastIndex = storageGroup.length();
+    }
+    String storageGroupName = storageGroup.substring(0, lastIndex);
+    long currentTime = DateTimeUtils.currentTime();
+    long currentPoints = memTable.getTotalPointsNum();
+    // compute the flush points
+    long points =
+        flushPointsCache.compute(
+                storageGroupName,
+                (storageGroup, previousPair) -> {
+                  if (previousPair == null || previousPair.left != 
currentTime) {
+                    // if previousPair is null or previousPair.latestTime not 
equals currentTime,
+                    // then create a new pair
+                    return new Pair<>(currentTime, currentPoints);
+                  } else {
+                    // if previousPair.latestTime equals currentTime, then 
accumulate the points
+                    return new Pair<>(currentTime, previousPair.right + 
currentPoints);
+                  }
+                })
+            .right;
+    // record the flush points
+    MetricService.getInstance()
+        .gaugeWithInternalReportAsync(
+            points,
+            Metric.POINTS.toString(),
+            MetricLevel.CORE,
+            currentTime,
+            Tag.DATABASE.toString(),
+            storageGroup.substring(0, lastIndex),
+            Tag.TYPE.toString(),
+            "flush",
+            Tag.REGION.toString(),
+            dataRegionId);
+  }
+
   /** io task (third task of pipeline) */
   @SuppressWarnings("squid:S135")
   private Runnable ioTask =

Reply via email to