This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch refactor_mem_control
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ea10e9e1092f67d17f1319047b5541d860013191
Author: 江天 <[email protected]>
AuthorDate: Fri Apr 12 14:54:34 2019 +0800

    add java doc
    enhance concurrency
    refine some naming
    add more specific logs
---
 .../engine/bufferwrite/BufferWriteProcessor.java   |  9 +++-
 .../db/engine/memcontrol/BasicMemController.java   | 24 ++++++++-
 .../db/engine/memcontrol/JVMMemController.java     |  4 +-
 .../db/engine/memcontrol/RecordMemController.java  | 58 ++++++++++------------
 .../db/engine/overflow/io/OverflowProcessor.java   | 49 +++++++++++++-----
 .../db/engine/memcontrol/MemControllerTest.java    | 18 +++----
 6 files changed, 102 insertions(+), 60 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index e09feec..d819c06 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -170,7 +170,7 @@ public class BufferWriteProcessor extends Processor {
   public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException 
{
     long memUsage = MemUtils.getRecordSize(tsRecord);
     BasicMemController.UsageLevel level = BasicMemController.getInstance()
-        .reportUse(this, memUsage);
+        .acquireUsage(this, memUsage);
     for (DataPoint dataPoint : tsRecord.dataPointList) {
       workMemTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), 
dataPoint.getType(),
           tsRecord.time,
@@ -355,7 +355,7 @@ public class BufferWriteProcessor extends Processor {
       valueCount = 0;
       switchWorkToFlush();
       long version = versionController.nextVersion();
-      BasicMemController.getInstance().reportFree(this, memSize.get());
+      BasicMemController.getInstance().releaseUsage(this, memSize.get());
       memSize.set(0);
       // switch
       flushFuture = FlushManager.getInstance().submit(() -> 
flushTask("asynchronously",
@@ -559,4 +559,9 @@ public class BufferWriteProcessor extends Processor {
   public int hashCode() {
     return Objects.hash(super.hashCode(), baseDir, fileName);
   }
+
+  @Override
+  public String toString() {
+    return "BufferWriteProcessor in " + insertFilePath;
+  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
index fe6753a..503afa3 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java
@@ -150,9 +150,29 @@ public abstract class BasicMemController implements 
IService {
     logger.info("MemController exited");
   }
 
-  public abstract UsageLevel reportUse(Object user, long usage);
+  /**
+   * Any object (like OverflowProcessor or BufferWriteProcessor) that wants to 
hold some fixed size
+   * of memory should call this method to check the returned memory usage 
level to decide any
+   * further actions.
+   * @param user an object that wants some memory as a buffer or anything.
+   * @param usage how many bytes does the object want.
+   * @return one of the three UsageLevels:
+   *          safe - there are still sufficient memories left, the user may go 
on freely and this
+   *                 usage is recorded.
+   *          warning - there is only a small amount of memories available, 
the user would better
+   *                    try to reduce memory usage but can still proceed and 
this usage is recorded.
+   *          dangerous - there is almost no memories unused, the user cannot 
proceed before enough
+   *                    memory usages are released and this usage is NOT 
recorded.
+   */
+  public abstract UsageLevel acquireUsage(Object user, long usage);
 
-  public abstract void reportFree(Object user, long freeSize);
+  /**
+   * When the memories held by one object (like OverflowProcessor or 
BufferWriteProcessor) is no
+   * more useful, this object should call this method to release the memories.
+   * @param user an object that holds some memory as a buffer or anything.
+   * @param freeSize how many bytes does the object want to release.
+   */
+  public abstract void releaseUsage(Object user, long freeSize);
 
   public enum ControllerType {
     RECORD, JVM
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java
index 34a92a4..31a3ef8 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java
@@ -61,7 +61,7 @@ public class JVMMemController extends BasicMemController {
   }
 
   @Override
-  public UsageLevel reportUse(Object user, long usage) {
+  public UsageLevel acquireUsage(Object user, long usage) {
     long memUsage = getTotalUsage() + usage;
     if (memUsage < warningThreshold) {
       return UsageLevel.SAFE;
@@ -82,7 +82,7 @@ public class JVMMemController extends BasicMemController {
   }
 
   @Override
-  public void reportFree(Object user, long freeSize) {
+  public void releaseUsage(Object user, long freeSize) {
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info("{} freed from {}, total usage {}", 
MemUtils.bytesCntToStr(freeSize),
           user.getClass(), MemUtils.bytesCntToStr(getTotalUsage()));
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java
index 2ffcc97..e96e280 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java
@@ -18,10 +18,9 @@
  */
 package org.apache.iotdb.db.engine.memcontrol;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.MemUtils;
@@ -33,15 +32,15 @@ import org.slf4j.LoggerFactory;
  */
 public class RecordMemController extends BasicMemController {
 
-  private static Logger LOGGER = 
LoggerFactory.getLogger(RecordMemController.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RecordMemController.class);
 
   // the key is the reference of the memory user, while the value is its 
memory usage in byte
-  private Map<Object, Long> memMap;
+  private Map<Object, AtomicLong> memMap;
   private AtomicLong totalMemUsed;
 
   private RecordMemController(IoTDBConfig config) {
     super(config);
-    memMap = new HashMap<>();
+    memMap = new ConcurrentHashMap<>();
     totalMemUsed = new AtomicLong(0);
   }
 
@@ -60,11 +59,6 @@ public class RecordMemController extends BasicMemController {
     totalMemUsed.set(0);
   }
 
-  @Override
-  public void close() {
-    super.close();
-  }
-
   /**
    * get the current memory usage level.
    */
@@ -84,11 +78,10 @@ public class RecordMemController extends BasicMemController 
{
    * report the increased memory usage of the object user.
    */
   @Override
-  public UsageLevel reportUse(Object user, long usage) {
-    Long oldUsage = memMap.get(user);
-    if (oldUsage == null) {
-      oldUsage = 0L;
-    }
+  public UsageLevel acquireUsage(Object user, long usage) {
+    AtomicLong userUsage = memMap.computeIfAbsent(user, k -> new 
AtomicLong(0));
+    long oldUsage = userUsage.get();
+
     long newTotUsage = totalMemUsed.get() + usage;
     // check if the new usage will reach dangerous threshold
     if (newTotUsage > dangerouseThreshold) {
@@ -101,12 +94,12 @@ public class RecordMemController extends 
BasicMemController {
       // double check if updating will reach dangerous threshold
       if (newTotUsage < warningThreshold) {
         // still safe, action taken
-        memMap.put(user, oldUsage + usage);
+        userUsage.addAndGet(usage);
         logSafe(newTotUsage, user, usage, oldUsage);
         return UsageLevel.SAFE;
       } else if (newTotUsage < dangerouseThreshold) {
         // become warning because competition with other threads, still take 
the action
-        memMap.put(user, oldUsage + usage);
+        userUsage.addAndGet(usage);
         logWarn(newTotUsage, user, usage, oldUsage);
         return UsageLevel.WARNING;
       } else {
@@ -121,7 +114,8 @@ public class RecordMemController extends BasicMemController 
{
 
   private void logDangerous(long newTotUsage, Object user) {
     if (LOGGER.isWarnEnabled()) {
-      LOGGER.warn("Memory request from {} is denied, memory usage : {}", 
user.getClass(),
+      LOGGER.warn("Memory request from {} is denied, memory usage : {}",
+          user,
           MemUtils.bytesCntToStr(newTotUsage));
     }
   }
@@ -129,7 +123,7 @@ public class RecordMemController extends BasicMemController 
{
   private void logSafe(long newTotUsage, Object user, long usage, long 
oldUsage) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Safe Threshold : {} allocated to {}, it is using {}, total 
usage {}",
-          MemUtils.bytesCntToStr(usage), user.getClass(),
+          MemUtils.bytesCntToStr(usage), user,
           MemUtils.bytesCntToStr(oldUsage + usage),
           MemUtils.bytesCntToStr(newTotUsage));
     }
@@ -138,7 +132,7 @@ public class RecordMemController extends BasicMemController 
{
   private void logWarn(long newTotUsage, Object user, long usage, long 
oldUsage) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Warning Threshold : {} allocated to {}, it is using {}, 
total usage {}",
-          MemUtils.bytesCntToStr(usage), user.getClass(),
+          MemUtils.bytesCntToStr(usage), user,
           MemUtils.bytesCntToStr(oldUsage + usage),
           MemUtils.bytesCntToStr(newTotUsage));
     }
@@ -148,26 +142,24 @@ public class RecordMemController extends 
BasicMemController {
    * report the decreased memory usage of the object user.
    */
   @Override
-  public void reportFree(Object user, long freeSize) {
-    Long usage = memMap.get(user);
+  public void releaseUsage(Object user, long freeSize) {
+    AtomicLong usage = memMap.get(user);
+    long usageLong = 0;
     if (usage == null) {
-      LOGGER.error("Unregistered memory usage from {}", user.getClass());
-    } else if (freeSize > usage) {
+      LOGGER.error("Unregistered memory usage from {}", user);
+    } else if (freeSize > usageLong) {
+      usageLong = usage.get();
       LOGGER
-          .error("Request to free {} bytes while it only registered {} bytes", 
freeSize, usage);
-      totalMemUsed.addAndGet(-usage);
-      memMap.remove(user);
+          .error("{} requests to free {} bytes while it only registered {} 
bytes", user,
+              freeSize, usage);
+      totalMemUsed.addAndGet(-usageLong);
     } else {
       long newTotalMemUsage = totalMemUsed.addAndGet(-freeSize);
-      if (usage - freeSize > 0) {
-        memMap.put(user, usage - freeSize);
-      } else {
-        memMap.remove(user);
-      }
+      usage.addAndGet(-freeSize);
       if (LOGGER.isInfoEnabled()) {
         LOGGER.info("{} freed from {}, it is using {}, total usage {}",
             MemUtils.bytesCntToStr(freeSize),
-            user.getClass(), MemUtils.bytesCntToStr(usage - freeSize),
+            user, MemUtils.bytesCntToStr(usage.get()),
             MemUtils.bytesCntToStr(newTotalMemUsage));
       }
     }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 0debfe4..6a1cfe8 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.pool.FlushManager;
@@ -174,18 +175,37 @@ public class OverflowProcessor extends Processor {
   public void insert(TSRecord tsRecord) throws IOException {
     // memory control
     long memUage = MemUtils.getRecordSize(tsRecord);
-    BasicMemController.getInstance().reportUse(this, memUage);
-    // write data
-    workSupport.insert(tsRecord);
-    valueCount++;
-    // check flush
-    memUage = memSize.addAndGet(memUage);
-    if (memUage > memThreshold) {
-      LOGGER.warn("The usage of memory {} in overflow processor {} reaches the 
threshold {}",
-          MemUtils.bytesCntToStr(memUage), getProcessorName(),
-          MemUtils.bytesCntToStr(memThreshold));
-      flush();
+    UsageLevel usageLevel = 
BasicMemController.getInstance().acquireUsage(this, memUage);
+    switch (usageLevel) {
+      case SAFE:
+        // write data
+        workSupport.insert(tsRecord);
+        valueCount++;
+        // check flush
+        memUage = memSize.addAndGet(memUage);
+        if (memUage > memThreshold) {
+          if (LOGGER.isWarnEnabled()) {
+            LOGGER.warn("The usage of memory {} in overflow processor {} 
reaches the threshold {}",
+                MemUtils.bytesCntToStr(memUage), getProcessorName(),
+                MemUtils.bytesCntToStr(memThreshold));
+          }
+          flush();
+        }
+        break;
+      case WARNING:
+        // write data
+        workSupport.insert(tsRecord);
+        valueCount++;
+        // flush
+        memSize.addAndGet(memUage);
+        flush();
+        break;
+      case DANGEROUS:
+        throw new IOException("The insertion is rejected because dangerous 
memory level hit");
     }
+
+
+
   }
 
   /**
@@ -514,7 +534,7 @@ public class OverflowProcessor extends Processor {
               getProcessorName(), e);
         }
       }
-      BasicMemController.getInstance().reportFree(this, memSize.get());
+      BasicMemController.getInstance().releaseUsage(this, memSize.get());
       memSize.set(0);
       valueCount = 0;
       // switch from work to flush
@@ -684,4 +704,9 @@ public class OverflowProcessor extends Processor {
   public long getLastFlushTime() {
     return lastFlushTime;
   }
+
+  @Override
+  public String toString() {
+    return "OverflowProcessor in " + parentPath;
+  }
 }
\ No newline at end of file
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java
index f9ac4ce..a9d48fa 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java
@@ -52,41 +52,41 @@ public class MemControllerTest {
 
     // every one request 1 GB, should get 7 safes, 8 warning and 5 dangerous
     for (int i = 0; i < 7; i++) {
-      BasicMemController.UsageLevel level = 
memController.reportUse(dummyUser[i], 1 * GB);
+      BasicMemController.UsageLevel level = 
memController.acquireUsage(dummyUser[i], 1 * GB);
       assertEquals(BasicMemController.UsageLevel.SAFE, level);
     }
     for (int i = 7; i < 15; i++) {
-      BasicMemController.UsageLevel level = 
memController.reportUse(dummyUser[i], 1 * GB);
+      BasicMemController.UsageLevel level = 
memController.acquireUsage(dummyUser[i], 1 * GB);
       assertEquals(BasicMemController.UsageLevel.WARNING, level);
     }
     for (int i = 15; i < 20; i++) {
-      BasicMemController.UsageLevel level = 
memController.reportUse(dummyUser[i], 1 * GB);
+      BasicMemController.UsageLevel level = 
memController.acquireUsage(dummyUser[i], 1 * GB);
       assertEquals(BasicMemController.UsageLevel.DANGEROUS, level);
     }
     assertEquals(15 * GB, memController.getTotalUsage());
     // every one free its mem
     for (int i = 0; i < 7; i++) {
-      memController.reportFree(dummyUser[i], 1 * GB);
+      memController.releaseUsage(dummyUser[i], 1 * GB);
       assertEquals((14 - i) * GB, memController.getTotalUsage());
     }
     for (int i = 7; i < 15; i++) {
-      memController.reportFree(dummyUser[i], 2 * GB);
+      memController.releaseUsage(dummyUser[i], 2 * GB);
       assertEquals((14 - i) * GB, memController.getTotalUsage());
     }
     // ask for a too big mem
-    BasicMemController.UsageLevel level = 
memController.reportUse(dummyUser[0], 100 * GB);
+    BasicMemController.UsageLevel level = 
memController.acquireUsage(dummyUser[0], 100 * GB);
     assertEquals(BasicMemController.UsageLevel.DANGEROUS, level);
     // single user ask continuously
     for (int i = 0; i < 8 * 1024 - 1; i++) {
-      level = memController.reportUse(dummyUser[0], 1 * MB);
+      level = memController.acquireUsage(dummyUser[0], 1 * MB);
       assertEquals(BasicMemController.UsageLevel.SAFE, level);
     }
     for (int i = 8 * 1024 - 1; i < 16 * 1024 - 1; i++) {
-      level = memController.reportUse(dummyUser[0], 1 * MB);
+      level = memController.acquireUsage(dummyUser[0], 1 * MB);
       assertEquals(BasicMemController.UsageLevel.WARNING, level);
     }
     for (int i = 16 * 1024 - 1; i < 17 * 1024; i++) {
-      level = memController.reportUse(dummyUser[0], 1 * MB);
+      level = memController.acquireUsage(dummyUser[0], 1 * MB);
       System.out.println(
           memController.getTotalUsage() / GB + " " + 
memController.getTotalUsage() / MB % 1024);
       assertEquals(BasicMemController.UsageLevel.DANGEROUS, level);

Reply via email to