This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev_sliding_mem_table by this 
push:
     new cc039ef  finish sliding mem table version 3
cc039ef is described below

commit cc039ef01773adb604c346f613bb1905dc4f5dae
Author: EJTTianyu <[email protected]>
AuthorDate: Tue Dec 8 10:19:32 2020 +0800

    finish sliding mem table version 3
---
 .../java/org/apache/iotdb/tsfile/TsFileSequenceRead.java  |  2 +-
 .../src/assembly/resources/conf/iotdb-engine.properties   |  6 +++---
 .../org/apache/iotdb/db/engine/flush/FlushManager.java    | 15 +++++++++------
 .../db/engine/storagegroup/StorageGroupProcessor.java     | 12 +++++++++---
 .../iotdb/db/engine/storagegroup/TsFileProcessor.java     |  4 ++--
 5 files changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java 
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 4cbf862..da5bae6 100644
--- 
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ 
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -41,7 +41,7 @@ public class TsFileSequenceRead {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public static void main(String[] args) throws IOException {
-    String filename = 
"/Users/tianyu/2019秋季学期/iotdb/server/target/data/sequence/root.vehicle.d0/0/1607075644401-1-0.tsfile";
+    String filename = "test.tsfile";
     if (args.length >= 1) {
       filename = args[0];
     }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 1f296ef..6137629 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -232,9 +232,6 @@ primitive_array_size=128
 # Ratio of write memory for invoking flush disk, 0.3 by default
 flush_proportion=0.3
 
-# Ratio of write memory for invoking flush immediately, 0.6 by default
-force_flush_proportion=0.6
-
 # Ratio of write memory allocated for buffered arrays, 0.6 by default
 buffered_arrays_memory_proportion=0.6
 
@@ -267,6 +264,9 @@ enable_sliding_mem_table=true
 # Save the flushing memtable in the memory during the period, can help reduce 
the unseq ratio, Unit: millis.
 flush_wait_time=60000
 
+# Ratio of write memory for invoking flush immediately, avoid OOM, 0.6 by 
default
+force_flush_proportion=0.6
+
 ####################
 ### Upgrade Configurations
 ####################
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index b102d03..2672f47 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -92,9 +92,9 @@ public class FlushManager implements FlushManagerMBean, 
IService {
       if (tsFileProcessor.isSequence() && 
IoTDBDescriptor.getInstance().getConfig()
           .isEnableSlidingMemTable() && 
!SystemInfo.getInstance().forceFlush()) {
         long startTime = System.currentTimeMillis();
-        while (!tsFileProcessor.isShouldClose()
-            && System.currentTimeMillis() - startTime < 
IoTDBDescriptor.getInstance().getConfig()
-            .getFlushWaitTime()) {
+        while (!tsFileProcessor.isShouldClose() &&
+            System.currentTimeMillis() - startTime < 
IoTDBDescriptor.getInstance().getConfig()
+                .getFlushWaitTime()) {
           // wait
           try {
             TimeUnit.MILLISECONDS
@@ -104,9 +104,12 @@ public class FlushManager implements FlushManagerMBean, 
IService {
           }
         }
       }
-      tsFileProcessor.setFlushingMemTable(null);
-      tsFileProcessor.setFlushMemTableAlive(false);
-      tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
+      if (tsFileProcessor.isSequence() && 
IoTDBDescriptor.getInstance().getConfig()
+          .isEnableSlidingMemTable()) {
+        
tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
+        tsFileProcessor.setFlushingMemTable(null);
+        tsFileProcessor.setFlushMemTableAlive(false);
+      }
       tsFileProcessor.flushOneMemTable();
       tsFileProcessor.setManagedByFlushManager(false);
       if (logger.isDebugEnabled()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f3896a6..537b36a 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -881,7 +881,8 @@ public class StorageGroupProcessor {
       return;
     }
 
-    if (sequence && config.isEnableSlidingMemTable()) {
+    // judge whether to insert into flushing mem table or not when flushing 
mem table is alive
+    if (tsFileProcessor.isFlushMemTableAlive()) {
       insertRowPlan
           .setToFlushingMemTable(isInsertToFlushingMemTable(timePartitionId, 
insertRowPlan));
     }
@@ -1668,17 +1669,22 @@ public class StorageGroupProcessor {
       }
 
       if (processor.isFlushMemTableAlive()) {
+        // if flushing mem table is alive, use latestTimeForEachDevice to 
update flushingLatestTimeForEachDevice
         for (Entry<String, Long> entry : 
curPartitionDeviceLatestTime.entrySet()) {
           flushingLatestTimeForEachDevice
               .computeIfAbsent(processor.getTimeRangeId(), id -> new 
HashMap<>())
               .put(entry.getKey(), entry.getValue());
         }
+        // when the processor is closing, update 
partitionLatestFlushedTimeForEachDevice
         if (processor.isUpdateLatestTime()) {
           updateLatestTime(processor, curPartitionDeviceLatestTime);
         }
       } else {
-        curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
-            .get(processor.getTimeRangeId());
+        // is sliding window is enabled, use flushingLatestTimeForEachDevice 
to update
+        if (config.isEnableSlidingMemTable()) {
+          curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+              .get(processor.getTimeRangeId());
+        }
         updateLatestTime(processor, curPartitionDeviceLatestTime);
       }
     } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a4d0a0a..e982933 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -605,11 +605,11 @@ public class TsFileProcessor {
    */
   public void asyncFlush() {
     if (config.isEnableSlidingMemTable()){
-      while (isManagedByFlushManager() && flushingMemTable != null) {
+      while (flushingMemTable != null) {
         try {
           TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
         } catch (InterruptedException e) {
-          e.printStackTrace();
+          logger.error("async flush failed because the flushing mem table is 
still alive", e);
         }
       }
     }

Reply via email to