This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new be003c2  [IOTDB-1541][To rel/0.12] Change sequence of wal and memtable 
in insert (#3705)
be003c2 is described below

commit be003c24148aeefd62e7cc794fb44525be0b2920
Author: Alan Choo <[email protected]>
AuthorDate: Tue Aug 10 16:06:05 2021 +0800

    [IOTDB-1541][To rel/0.12] Change sequence of wal and memtable in insert 
(#3705)
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 14 ++++++-
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  6 +++
 .../db/engine/storagegroup/TsFileProcessor.java    | 49 ++++++++++++++++++----
 3 files changed, 59 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 3865ab9..b49fc31 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -304,13 +304,23 @@ public abstract class AbstractMemTable implements 
IMemTable {
   }
 
   @Override
+  public void releaseTVListRamCost(long cost) {
+    this.tvListRamCost -= cost;
+  }
+
+  @Override
   public long getTVListsRamCost() {
     return tvListRamCost;
   }
 
   @Override
-  public void addTextDataSize(long testDataSize) {
-    this.memSize += testDataSize;
+  public void addTextDataSize(long textDataSize) {
+    this.memSize += textDataSize;
+  }
+
+  @Override
+  public void releaseTextDataSize(long textDataSize) {
+    this.memSize -= textDataSize;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 07e7cd0..ce5de4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -65,6 +65,9 @@ public interface IMemTable {
   void addTVListRamCost(long cost);
 
   /** only used when mem control enabled */
+  void releaseTVListRamCost(long cost);
+
+  /** only used when mem control enabled */
   long getTVListsRamCost();
 
   /**
@@ -134,6 +137,9 @@ public interface IMemTable {
   /** only used when mem control enabled */
   void addTextDataSize(long textDataIncrement);
 
+  /** only used when mem control enabled */
+  void releaseTextDataSize(long textDataDecrement);
+
   long getMaxPlanIndex();
 
   long getMinPlanIndex();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 0eca8a2..e35cc3a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -184,16 +184,18 @@ public class TsFileProcessor {
       }
     }
 
+    long[] memIncrements = null;
     if (enableMemControl) {
-      checkMemCostAndAddToTspInfo(insertRowPlan);
+      memIncrements = checkMemCostAndAddToTspInfo(insertRowPlan);
     }
 
-    workMemTable.insert(insertRowPlan);
-
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       try {
         getLogNode().write(insertRowPlan);
       } catch (Exception e) {
+        if (enableMemControl && memIncrements != null) {
+          rollbackMemoryInfo(memIncrements);
+        }
         throw new WriteProcessException(
             String.format(
                 "%s: %s write WAL failed",
@@ -202,6 +204,8 @@ public class TsFileProcessor {
       }
     }
 
+    workMemTable.insert(insertRowPlan);
+
     // update start time of this memtable
     tsFileResource.updateStartTime(
         insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
@@ -236,9 +240,10 @@ public class TsFileProcessor {
       }
     }
 
+    long[] memIncrements = null;
     try {
       if (enableMemControl) {
-        checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
+        memIncrements = checkMemCostAndAddToTspInfo(insertTabletPlan, start, 
end);
       }
     } catch (WriteProcessException e) {
       for (int i = start; i < end; i++) {
@@ -246,8 +251,8 @@ public class TsFileProcessor {
       }
       throw new WriteProcessException(e);
     }
+
     try {
-      workMemTable.insertTablet(insertTabletPlan, start, end);
       long startTime = System.currentTimeMillis();
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         insertTabletPlan.setStart(start);
@@ -262,8 +267,21 @@ public class TsFileProcessor {
       for (int i = start; i < end; i++) {
         results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
       }
+      if (enableMemControl && memIncrements != null) {
+        rollbackMemoryInfo(memIncrements);
+      }
+      throw new WriteProcessException(e);
+    }
+
+    try {
+      workMemTable.insertTablet(insertTabletPlan, start, end);
+    } catch (WriteProcessException e) {
+      for (int i = start; i < end; i++) {
+        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
+      }
       throw new WriteProcessException(e);
     }
+
     for (int i = start; i < end; i++) {
       results[i] = RpcUtils.SUCCESS_STATUS;
     }
@@ -279,7 +297,7 @@ public class TsFileProcessor {
     tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
   }
 
-  private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+  private long[] checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
       throws WriteProcessException {
     // memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
@@ -312,12 +330,13 @@ public class TsFileProcessor {
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
+    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, 
int start, int end)
+  private long[] checkMemCostAndAddToTspInfo(InsertTabletPlan 
insertTabletPlan, int start, int end)
       throws WriteProcessException {
     if (start >= end) {
-      return;
+      return new long[] {0, 0, 0};
     }
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
@@ -337,6 +356,7 @@ public class TsFileProcessor {
     long textDataIncrement = memIncrements[1];
     long chunkMetadataIncrement = memIncrements[2];
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
+    return memIncrements;
   }
 
   private void updateMemCost(
@@ -398,6 +418,19 @@ public class TsFileProcessor {
     workMemTable.addTextDataSize(textDataIncrement);
   }
 
+  private void rollbackMemoryInfo(long[] memIncrements) {
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    memTableIncrement += textDataIncrement;
+    storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
+    tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
+    SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+    workMemTable.releaseTVListRamCost(memTableIncrement);
+    workMemTable.releaseTextDataSize(textDataIncrement);
+  }
+
   /**
    * Delete data which belongs to the timeseries `deviceId.measurementId` and 
the timestamp of which
    * <= 'timestamp' in the deletion. <br>

Reply via email to