This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch NewMemControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f3c298a873516e8b1f6711e38bcd6d6dd59f933f
Author: HTHou <[email protected]>
AuthorDate: Sun Apr 25 16:04:13 2021 +0800

    new mem control strategy
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  30 ++-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  11 ++
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   4 +
 .../db/engine/storagegroup/StorageGroupInfo.java   |   6 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  29 ---
 .../db/engine/storagegroup/TsFileProcessor.java    |  36 ++--
 .../engine/storagegroup/TsFileProcessorInfo.java   |   5 +-
 .../db/engine/storagegroup/TsFileResource.java     |  11 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  18 --
 .../storagegroup/timeindex/FileTimeIndex.java      |   5 -
 .../engine/storagegroup/timeindex/ITimeIndex.java  |   8 -
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 208 ++++++++++-----------
 .../engine/storagegroup/TsFileProcessorTest.java   |   4 -
 13 files changed, 161 insertions(+), 214 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6ae0edf..277fd1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -61,7 +61,9 @@ import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -72,6 +74,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -103,8 +106,8 @@ public class StorageEngine implements IService {
   @ServerConfigConsistent private static long timePartitionInterval = -1;
   /** whether enable data partition if disabled, all data belongs to partition 
0 */
   @ServerConfigConsistent
-  private static boolean enablePartition =
-      IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+  private static boolean enablePartition = config.isEnablePartition();
+  private final boolean enableMemControl = config.isEnableMemControl();
 
   /**
    * a folder (system/storage_groups/ by default) that persist system info. 
Each Storage Processor
@@ -504,6 +507,13 @@ public class StorageEngine implements IService {
    * @param insertRowPlan physical plan of insertion
    */
   public void insert(InsertRowPlan insertRowPlan) throws 
StorageEngineException {
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject();
+      } catch (WriteProcessException e) {
+        throw new StorageEngineException(e);
+      }
+    }
     StorageGroupProcessor storageGroupProcessor = 
getProcessor(insertRowPlan.getDeviceId());
 
     try {
@@ -525,6 +535,13 @@ public class StorageEngine implements IService {
 
   public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
       throws StorageEngineException {
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject();
+      } catch (WriteProcessException e) {
+        throw new StorageEngineException(e);
+      }
+    }
     StorageGroupProcessor storageGroupProcessor =
         getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
 
@@ -539,6 +556,15 @@ public class StorageEngine implements IService {
   /** insert a InsertTabletPlan to a storage group */
   public void insertTablet(InsertTabletPlan insertTabletPlan)
       throws StorageEngineException, BatchProcessException {
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject();
+      } catch (WriteProcessRejectException e) {
+        TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+        Arrays.fill(results, 
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
+        throw new BatchProcessException(results);
+      }
+    }
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 0eb04c8..ee72aff 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -52,6 +52,7 @@ public abstract class AbstractMemTable implements IMemTable {
    */
   protected boolean disableMemControl = true;
 
+  private boolean shouldFlush = false;
   private int avgSeriesPointNumThreshold =
       
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
   /** memory size of data points, including TEXT values */
@@ -388,6 +389,16 @@ public abstract class AbstractMemTable implements 
IMemTable {
   }
 
   @Override
+  public void setShouldFlush() {
+    shouldFlush = true;
+  }
+
+  @Override
+  public boolean shouldFlush() {
+    return shouldFlush;
+  }
+
+  @Override
   public void release() {
     for (Entry<String, Map<String, IWritableMemChunk>> entry : 
memTableMap.entrySet()) {
       for (Entry<String, IWritableMemChunk> subEntry : 
entry.getValue().entrySet()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 8724e83..5f74b01 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -110,6 +110,10 @@ public interface IMemTable {
 
   boolean isSignalMemTable();
 
+  void setShouldFlush();
+
+  boolean shouldFlush();
+
   void release();
 
   /** must guarantee the device exists in the work memtable only used when mem 
control enabled */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index c1f6713..c5ee561 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -62,9 +62,7 @@ public class StorageGroupInfo {
 
   /** When create a new TsFileProcessor, call this method */
   public void initTsFileProcessorInfo(TsFileProcessor tsFileProcessor) {
-    if (reportedTsps.add(tsFileProcessor)) {
-      
memoryCost.getAndAdd(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize());
-    }
+    reportedTsps.add(tsFileProcessor);
   }
 
   public void addStorageGroupMemCost(long cost) {
@@ -99,7 +97,7 @@ public class StorageGroupInfo {
    */
   public void closeTsFileProcessorAndReportToSystem(TsFileProcessor 
tsFileProcessor) {
     reportedTsps.remove(tsFileProcessor);
-    SystemInfo.getInstance().resetStorageGroupStatus(this, true);
+    SystemInfo.getInstance().resetStorageGroupStatus(this);
   }
 
   public Supplier<ByteBuffer[]> getWalSupplier() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 88979e2..69277b1 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -727,8 +727,6 @@ public class StorageGroupProcessor {
             TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
             tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
             this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-            tsFileProcessorInfo.addTSPMemCost(
-                tsFileProcessor.getTsFileResource().calculateRamSize());
           }
           workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         } else {
@@ -745,8 +743,6 @@ public class StorageGroupProcessor {
             TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
             tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
             this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-            tsFileProcessorInfo.addTSPMemCost(
-                tsFileProcessor.getTsFileResource().calculateRamSize());
           }
           workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         }
@@ -791,9 +787,6 @@ public class StorageGroupProcessor {
     if (!isAlive(insertRowPlan.getTime())) {
       throw new OutOfTTLException(insertRowPlan.getTime(), 
(System.currentTimeMillis() - dataTTL));
     }
-    if (enableMemControl) {
-      StorageEngine.blockInsertionIfReject();
-    }
     writeLock();
     try {
       // init map
@@ -835,15 +828,6 @@ public class StorageGroupProcessor {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public void insertTablet(InsertTabletPlan insertTabletPlan)
       throws BatchProcessException, TriggerExecutionException {
-    if (enableMemControl) {
-      try {
-        StorageEngine.blockInsertionIfReject();
-      } catch (WriteProcessRejectException e) {
-        TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
-        Arrays.fill(results, 
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
-        throw new BatchProcessException(results);
-      }
-    }
 
     writeLock();
     try {
@@ -1126,18 +1110,6 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor 
tsFileProcessor) {
-    writeLock();
-    try {
-      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor)
-          && !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
-        fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
-
   private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean 
sequence) {
     TsFileProcessor tsFileProcessor = null;
     try {
@@ -1252,7 +1224,6 @@ public class StorageGroupProcessor {
       TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
       tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
       this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-      
tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
     }
 
     tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 0207971..8cc6b50 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -121,7 +121,6 @@ public class TsFileProcessor {
   private WriteLogNode logNode;
   private final boolean sequence;
   private long totalMemTableSize;
-  private boolean shouldFlush = false;
 
   private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get 
flushQueryLock write lock";
   private static final String FLUSH_QUERY_WRITE_RELEASE =
@@ -285,7 +284,6 @@ public class TsFileProcessor {
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
     String deviceId = insertRowPlan.getDeviceId().getFullPath();
-    long unsealedResourceIncrement = 
tsFileResource.estimateRamIncrement(deviceId);
     int columnIndex = 0;
     for (int i = 0; i < insertRowPlan.getMeasurementMNodes().length; i++) {
       // skip failed Measurements
@@ -326,7 +324,7 @@ public class TsFileProcessor {
       }
     }
     updateMemoryInfo(
-        memTableIncrement, unsealedResourceIncrement, chunkMetadataIncrement, 
textDataIncrement);
+        memTableIncrement, chunkMetadataIncrement, textDataIncrement);
   }
 
   private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, 
int start, int end)
@@ -337,7 +335,6 @@ public class TsFileProcessor {
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
     String deviceId = insertTabletPlan.getDeviceId().getFullPath();
-    long unsealedResourceIncrement = 
tsFileResource.estimateRamIncrement(deviceId);
 
     int columnIndex = 0;
     for (int i = 0; i < insertTabletPlan.getMeasurementMNodes().length; i++) {
@@ -368,7 +365,7 @@ public class TsFileProcessor {
     long textDataIncrement = memIncrements[1];
     long chunkMetadataIncrement = memIncrements[2];
     updateMemoryInfo(
-        memTableIncrement, unsealedResourceIncrement, chunkMetadataIncrement, 
textDataIncrement);
+        memTableIncrement, chunkMetadataIncrement, textDataIncrement);
   }
 
   private void updateMemCost(
@@ -453,21 +450,21 @@ public class TsFileProcessor {
 
   private void updateMemoryInfo(
       long memTableIncrement,
-      long unsealedResourceIncrement,
       long chunkMetadataIncrement,
       long textDataIncrement)
       throws WriteProcessException {
     memTableIncrement += textDataIncrement;
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
-    tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + 
chunkMetadataIncrement);
+    tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
     if (storageGroupInfo.needToReportToSystem()) {
-      SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
       try {
-        StorageEngine.blockInsertionIfReject();
+        if 
(!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) {
+          StorageEngine.blockInsertionIfReject();
+        }
       } catch (WriteProcessRejectException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
-        tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + 
chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, 
false);
+        tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
         throw e;
       }
     }
@@ -520,7 +517,7 @@ public class TsFileProcessor {
     if (workMemTable == null) {
       return false;
     }
-    if (shouldFlush) {
+    if (workMemTable.shouldFlush()) {
       logger.info(
           "The memtable size {} of tsfile {} reaches the mem control 
threshold",
           workMemTable.memSize(),
@@ -766,6 +763,9 @@ public class TsFileProcessor {
       flushListener.onFlushStart(tobeFlushed);
     }
 
+    if (enableMemControl) {
+      
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
+    }
     flushingMemTables.addLast(tobeFlushed);
     if (logger.isDebugEnabled()) {
       logger.debug(
@@ -780,7 +780,6 @@ public class TsFileProcessor {
       totalMemTableSize += tobeFlushed.memSize();
     }
     workMemTable = null;
-    shouldFlush = false;
     FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
@@ -821,7 +820,8 @@ public class TsFileProcessor {
               flushingMemTables.size());
         }
         // report to System
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, 
true);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
       }
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -1265,12 +1265,8 @@ public class TsFileProcessor {
     return sequence;
   }
 
-  public void startAsyncFlush() {
-    
storageGroupInfo.getStorageGroupProcessor().asyncFlushMemTableInTsFileProcessor(this);
-  }
-
-  public void setFlush() {
-    shouldFlush = true;
+  public void setWorkMemTableShouldFlush() {
+    workMemTable.setShouldFlush();
   }
 
   public void addFlushListener(FlushListener listener) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
index 94e8def..409227f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
 /** The TsFileProcessorInfo records the memory cost of this TsFileProcessor. */
 public class TsFileProcessorInfo {
@@ -31,7 +30,7 @@ public class TsFileProcessorInfo {
 
   public TsFileProcessorInfo(StorageGroupInfo storageGroupInfo) {
     this.storageGroupInfo = storageGroupInfo;
-    this.memCost = 
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize();
+    this.memCost = 0L;
   }
 
   /** called in each insert */
@@ -49,6 +48,6 @@ public class TsFileProcessorInfo {
   /** called when closing TSP */
   public void clear() {
     storageGroupInfo.releaseStorageGroupMemCost(memCost);
-    memCost = 0;
+    memCost = 0L;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index c6a7abc..0a36c53 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -772,20 +772,11 @@ public class TsFileResource {
     this.modFile = modFile;
   }
 
-  /** @return initial resource map size */
+  /** @return resource map size */
   public long calculateRamSize() {
     return timeIndex.calculateRamSize();
   }
 
-  /**
-   * Calculate the resource ram increment when insert data in TsFileProcessor
-   *
-   * @return ramIncrement
-   */
-  public long estimateRamIncrement(String deviceToBeChecked) {
-    return timeIndex.estimateRamIncrement(deviceToBeChecked);
-  }
-
   public void delete() throws IOException {
     if (file.exists()) {
       Files.delete(file.toPath());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index bade0d3..381d4b3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -184,24 +184,6 @@ public class DeviceTimeIndex implements ITimeIndex {
         + RamUsageEstimator.sizeOf(endTimes);
   }
 
-  @Override
-  public long estimateRamIncrement(String deviceToBeChecked) {
-    long ramIncrement = 0L;
-    if (!deviceToIndex.containsKey(deviceToBeChecked)) {
-      // 80 is the Map.Entry header ram size
-      if (deviceToIndex.isEmpty()) {
-        ramIncrement += 80;
-      }
-      // Map.Entry ram size
-      ramIncrement += RamUsageEstimator.sizeOf(deviceToBeChecked) + 16;
-      // if needs to extend the startTimes and endTimes arrays
-      if (deviceToIndex.size() >= startTimes.length) {
-        ramIncrement += startTimes.length * Long.BYTES;
-      }
-    }
-    return ramIncrement;
-  }
-
   private int getDeviceIndex(String deviceId) {
     int index;
     if (deviceToIndex.containsKey(deviceId)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index 4e3f601..e1e5251 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -135,11 +135,6 @@ public class FileTimeIndex implements ITimeIndex {
   }
 
   @Override
-  public long estimateRamIncrement(String deviceToBeChecked) {
-    return devices.contains(deviceToBeChecked) ? 0L : 
RamUsageEstimator.sizeOf(deviceToBeChecked);
-  }
-
-  @Override
   public long getTimePartition(String tsFilePath) {
     try {
       if (devices != null && !devices.isEmpty()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index 7738c19..4c87408 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -77,14 +77,6 @@ public interface ITimeIndex {
   long calculateRamSize();
 
   /**
-   * Calculate file index ram increment when insert data in TsFileProcessor
-   *
-   * @param deviceToBeChecked device to be checked
-   * @return ramIncrement
-   */
-  long estimateRamIncrement(String deviceToBeChecked);
-
-  /**
    * get time partition
    *
    * @param tsFilePath tsFile absolute path
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 25ad6be..e9b1312 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -24,13 +24,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
@@ -39,50 +37,63 @@ public class SystemInfo {
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   private static final Logger logger = 
LoggerFactory.getLogger(SystemInfo.class);
 
-  private long totalSgMemCost = 0L;
+  private long totalStorageGroupMemCost = 0L;
   private volatile boolean rejected = false;
 
   private static long memorySizeForWrite = config.getAllocateMemoryForWrite();
-  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+  private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new 
HashMap<>();
 
+  private long flushingMemTablesCost = 0L;
   private static double FLUSH_THERSHOLD = memorySizeForWrite * 
config.getFlushProportion();
   private static double REJECT_THERSHOLD = memorySizeForWrite * 
config.getRejectProportion();
 
-  private boolean isEncodingFasterThanIo = true;
+  private volatile boolean isEncodingFasterThanIo = true;
 
   /**
    * Report current mem cost of storage group to system. Called when the 
memory of storage group
    * newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold()
    *
    * @param storageGroupInfo storage group
+   * @throws WriteProcessRejectException 
    */
-  public synchronized void reportStorageGroupStatus(StorageGroupInfo 
storageGroupInfo) {
+  public synchronized boolean reportStorageGroupStatus(StorageGroupInfo 
storageGroupInfo, TsFileProcessor tsFileProcessor) throws 
WriteProcessRejectException {
     long delta =
-        storageGroupInfo.getMemCost() - 
reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
-    totalSgMemCost += delta;
+        storageGroupInfo.getMemCost() - 
reportedStorageGroupMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalStorageGroupMemCost += delta;
     if (logger.isDebugEnabled()) {
       logger.debug(
           "Report Storage Group Status to the system. "
               + "After adding {}, current sg mem cost is {}.",
           delta,
-          totalSgMemCost);
+          totalStorageGroupMemCost);
     }
-    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
+    reportedStorageGroupMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
     storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-    if (totalSgMemCost >= FLUSH_THERSHOLD) {
+    if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
+      return true;
+    } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD && 
totalStorageGroupMemCost < REJECT_THERSHOLD) {
       logger.debug(
           "The total storage group mem costs are too large, call for flushing. 
"
               + "Current sg cost is {}",
-          totalSgMemCost);
-      chooseTSPToMarkFlush();
-    }
-    if (totalSgMemCost >= REJECT_THERSHOLD) {
+              totalStorageGroupMemCost);
+      chooseMemTablesToMarkFlush(tsFileProcessor);
+      return true;
+    } else {
       logger.info(
           "Change system to reject status. Triggered by: logical SG ({}), mem 
cost delta ({}), totalSgMemCost ({}).",
           
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
           delta,
-          totalSgMemCost);
+          totalStorageGroupMemCost);
       rejected = true;
+      if (chooseMemTablesToMarkFlush(tsFileProcessor)) {
+        if (totalStorageGroupMemCost < memorySizeForWrite) {
+          return true;
+        } else {
+          throw new WriteProcessRejectException("Total Storage Group MemCost 
"+ totalStorageGroupMemCost +" is over than memorySizeForWrite");
+        }
+      } else {
+        return false;
+      }
     }
   }
 
@@ -92,119 +103,94 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public void resetStorageGroupStatus(
-      StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) {
-    boolean needForceAsyncFlush = false;
-    synchronized (this) {
-      long delta = 0;
-
-      if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-        delta = reportedSgMemCostMap.get(storageGroupInfo) - 
storageGroupInfo.getMemCost();
-        this.totalSgMemCost -= delta;
-        storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-        reportedSgMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
-      }
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo 
storageGroupInfo) {
+    long delta = 0;
+
+    if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) {
+      delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - 
storageGroupInfo.getMemCost();
+      this.totalStorageGroupMemCost -= delta;
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+      reportedStorageGroupMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
+    }
 
-      if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < 
REJECT_THERSHOLD) {
-        logger.debug(
-            "SG ({}) released memory (delta: {}) but still exceeding flush 
proportion (totalSgMemCost: {}), call flush.",
-            
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
-            delta,
-            totalSgMemCost);
-        if (rejected) {
-          logger.info(
-              "SG ({}) released memory (delta: {}), set system to normal 
status (totalSgMemCost: {}).",
-              
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
-              delta,
-              totalSgMemCost);
-        }
-        logCurrentTotalSGMemory();
-        rejected = false;
-        needForceAsyncFlush = true;
-      } else if (totalSgMemCost >= REJECT_THERSHOLD) {
-        logger.warn(
-            "SG ({}) released memory (delta: {}), but system is still in 
reject status (totalSgMemCost: {}).",
-            
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
-            delta,
-            totalSgMemCost);
-        logCurrentTotalSGMemory();
-        rejected = true;
-        needForceAsyncFlush = true;
-      } else {
-        logger.debug(
-            "SG ({}) released memory (delta: {}), system is in normal status 
(totalSgMemCost: {}).",
+    if (totalStorageGroupMemCost >= FLUSH_THERSHOLD && 
totalStorageGroupMemCost < REJECT_THERSHOLD) {
+      logger.debug(
+          "SG ({}) released memory (delta: {}) but still exceeding flush 
proportion (totalSgMemCost: {}), call flush.",
+          
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+          delta,
+          totalStorageGroupMemCost);
+      if (rejected) {
+        logger.info(
+            "SG ({}) released memory (delta: {}), set system to normal status 
(totalSgMemCost: {}).",
             
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
             delta,
-            totalSgMemCost);
-        logCurrentTotalSGMemory();
-        rejected = false;
+            totalStorageGroupMemCost);
       }
+      logCurrentTotalSGMemory();
+      rejected = false;
+    } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) {
+      logger.warn(
+          "SG ({}) released memory (delta: {}), but system is still in reject 
status (totalSgMemCost: {}).",
+          
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+          delta,
+          totalStorageGroupMemCost);
+      logCurrentTotalSGMemory();
+      rejected = true;
+    } else {
+      logger.debug(
+          "SG ({}) released memory (delta: {}), system is in normal status 
(totalSgMemCost: {}).",
+          
storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(),
+          delta,
+          totalStorageGroupMemCost);
+      logCurrentTotalSGMemory();
+      rejected = false;
     }
-    if (shouldInvokeFlush && needForceAsyncFlush) {
-      forceAsyncFlush();
-    }
+  }
+
+  public synchronized void addFlushingMemTableCost(long flushingMemTableCost) {
+    this.flushingMemTablesCost += flushingMemTableCost;
+  }
+
+  public synchronized void resetFlushingMemTableCost(long 
flushingMemTableCost) {
+    this.flushingMemTablesCost -= flushingMemTableCost;
   }
 
   private void logCurrentTotalSGMemory() {
-    logger.debug("Current Sg cost is {}", totalSgMemCost);
+    logger.debug("Current Sg cost is {}", totalStorageGroupMemCost);
   }
 
   /**
-   * Order all tsfileProcessors in system by memory cost of actual data points 
in memtable. Mark the
+   * Order all working memtables in system by memory cost of actual data 
points in memtable. Mark the
    * top K TSPs as to be flushed, so that after flushing the K TSPs, the 
memory cost should be less
    * than FLUSH_THRESHOLD
    */
-  private void chooseTSPToMarkFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
-      return;
-    }
+  private boolean chooseMemTablesToMarkFlush(TsFileProcessor 
currentTsFileProcessor) {
     // If invoke flush by replaying logs, do not flush now!
-    if (reportedSgMemCostMap.size() == 0) {
-      return;
-    }
-    // get the tsFile processors which has the max work MemTable size
-    List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
-    for (TsFileProcessor processor : processors) {
-      if (processor != null) {
-        processor.setFlush();
-      }
+    if (reportedStorageGroupMemCostMap.size() == 0) {
+      return false;
     }
-  }
-
-  /** Be Careful!! This method can only be called by flush thread! */
-  private void forceAsyncFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) {
-      return;
-    }
-    List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
-    if (logger.isDebugEnabled()) {
-      logger.debug("[mem control] get {} tsp to flush", processors.size());
-    }
-    for (TsFileProcessor processor : processors) {
-      if (processor != null) {
-        processor.startAsyncFlush();
-      }
-    }
-  }
-
-  private List<TsFileProcessor> getTsFileProcessorsToFlush() {
-    PriorityQueue<TsFileProcessor> tsps =
+    PriorityQueue<TsFileProcessor> allTsFileProcessors =
         new PriorityQueue<>(
             (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), 
o1.getWorkMemTableRamCost()));
-    for (StorageGroupInfo sgInfo : reportedSgMemCostMap.keySet()) {
-      tsps.addAll(sgInfo.getAllReportedTsp());
+    for (StorageGroupInfo storageGroupInfo : 
reportedStorageGroupMemCostMap.keySet()) {
+      allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
     }
-    List<TsFileProcessor> processors = new ArrayList<>();
+    boolean isCurrentTsFileProcessorSelected = false;
     long memCost = 0;
-    while (totalSgMemCost - memCost > FLUSH_THERSHOLD / 2) {
-      if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
-        return processors;
+    long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost;
+    while (activeMemSize - memCost > FLUSH_THERSHOLD) {
+      if (allTsFileProcessors.isEmpty() || 
allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
+        return false;
+      }
+      TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
+      memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
+      selectedTsFileProcessor.setWorkMemTableShouldFlush();
+      if (selectedTsFileProcessor == currentTsFileProcessor) {
+        isCurrentTsFileProcessorSelected = true;
       }
-      processors.add(tsps.peek());
-      memCost += tsps.peek().getWorkMemTableRamCost();
-      tsps.poll();
+      allTsFileProcessors.poll();
     }
-    return processors;
+    return isCurrentTsFileProcessorSelected;
   }
 
   public boolean isRejected() {
@@ -220,8 +206,8 @@ public class SystemInfo {
   }
 
   public void close() {
-    reportedSgMemCostMap.clear();
-    totalSgMemCost = 0;
+    reportedStorageGroupMemCostMap.clear();
+    totalStorageGroupMemCost = 0;
     rejected = false;
   }
 
@@ -249,7 +235,7 @@ public class SystemInfo {
   }
 
   public long getTotalMemTableSize() {
-    return totalSgMemCost;
+    return totalStorageGroupMemCost;
   }
 
   public double getFlushThershold() {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 5f43331..b3d20c6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -102,7 +102,6 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    
tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize());
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(
@@ -179,7 +178,6 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    
tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize());
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(
@@ -282,7 +280,6 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    
tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize());
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(
@@ -339,7 +336,6 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    
tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize());
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
 

Reply via email to