This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new be003c2 [IOTDB-1541][To rel/0.12] Change sequence of wal and memtable
in insert (#3705)
be003c2 is described below
commit be003c24148aeefd62e7cc794fb44525be0b2920
Author: Alan Choo <[email protected]>
AuthorDate: Tue Aug 10 16:06:05 2021 +0800
[IOTDB-1541][To rel/0.12] Change sequence of wal and memtable in insert
(#3705)
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 14 ++++++-
.../apache/iotdb/db/engine/memtable/IMemTable.java | 6 +++
.../db/engine/storagegroup/TsFileProcessor.java | 49 ++++++++++++++++++----
3 files changed, 59 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 3865ab9..b49fc31 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -304,13 +304,23 @@ public abstract class AbstractMemTable implements
IMemTable {
}
@Override
+ public void releaseTVListRamCost(long cost) {
+ this.tvListRamCost -= cost;
+ }
+
+ @Override
public long getTVListsRamCost() {
return tvListRamCost;
}
@Override
- public void addTextDataSize(long testDataSize) {
- this.memSize += testDataSize;
+ public void addTextDataSize(long textDataSize) {
+ this.memSize += textDataSize;
+ }
+
+ @Override
+ public void releaseTextDataSize(long textDataSize) {
+ this.memSize -= textDataSize;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 07e7cd0..ce5de4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -65,6 +65,9 @@ public interface IMemTable {
void addTVListRamCost(long cost);
/** only used when mem control enabled */
+ void releaseTVListRamCost(long cost);
+
+ /** only used when mem control enabled */
long getTVListsRamCost();
/**
@@ -134,6 +137,9 @@ public interface IMemTable {
/** only used when mem control enabled */
void addTextDataSize(long textDataIncrement);
+ /** only used when mem control enabled */
+ void releaseTextDataSize(long textDataDecrement);
+
long getMaxPlanIndex();
long getMinPlanIndex();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 0eca8a2..e35cc3a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -184,16 +184,18 @@ public class TsFileProcessor {
}
}
+ long[] memIncrements = null;
if (enableMemControl) {
- checkMemCostAndAddToTspInfo(insertRowPlan);
+ memIncrements = checkMemCostAndAddToTspInfo(insertRowPlan);
}
- workMemTable.insert(insertRowPlan);
-
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
try {
getLogNode().write(insertRowPlan);
} catch (Exception e) {
+ if (enableMemControl && memIncrements != null) {
+ rollbackMemoryInfo(memIncrements);
+ }
throw new WriteProcessException(
String.format(
"%s: %s write WAL failed",
@@ -202,6 +204,8 @@ public class TsFileProcessor {
}
}
+ workMemTable.insert(insertRowPlan);
+
// update start time of this memtable
tsFileResource.updateStartTime(
insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
@@ -236,9 +240,10 @@ public class TsFileProcessor {
}
}
+ long[] memIncrements = null;
try {
if (enableMemControl) {
- checkMemCostAndAddToTspInfo(insertTabletPlan, start, end);
+ memIncrements = checkMemCostAndAddToTspInfo(insertTabletPlan, start,
end);
}
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
@@ -246,8 +251,8 @@ public class TsFileProcessor {
}
throw new WriteProcessException(e);
}
+
try {
- workMemTable.insertTablet(insertTabletPlan, start, end);
long startTime = System.currentTimeMillis();
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
insertTabletPlan.setStart(start);
@@ -262,8 +267,21 @@ public class TsFileProcessor {
for (int i = start; i < end; i++) {
results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
}
+ if (enableMemControl && memIncrements != null) {
+ rollbackMemoryInfo(memIncrements);
+ }
+ throw new WriteProcessException(e);
+ }
+
+ try {
+ workMemTable.insertTablet(insertTabletPlan, start, end);
+ } catch (WriteProcessException e) {
+ for (int i = start; i < end; i++) {
+ results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
throw new WriteProcessException(e);
}
+
for (int i = start; i < end; i++) {
results[i] = RpcUtils.SUCCESS_STATUS;
}
@@ -279,7 +297,7 @@ public class TsFileProcessor {
tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
}
- private void checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
+ private long[] checkMemCostAndAddToTspInfo(InsertRowPlan insertRowPlan)
throws WriteProcessException {
// memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
long memTableIncrement = 0L;
@@ -312,12 +330,13 @@ public class TsFileProcessor {
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
+ return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
- private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan,
int start, int end)
+ private long[] checkMemCostAndAddToTspInfo(InsertTabletPlan
insertTabletPlan, int start, int end)
throws WriteProcessException {
if (start >= end) {
- return;
+ return new long[] {0, 0, 0};
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
@@ -337,6 +356,7 @@ public class TsFileProcessor {
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
+ return memIncrements;
}
private void updateMemCost(
@@ -398,6 +418,19 @@ public class TsFileProcessor {
workMemTable.addTextDataSize(textDataIncrement);
}
+ private void rollbackMemoryInfo(long[] memIncrements) {
+ long memTableIncrement = memIncrements[0];
+ long textDataIncrement = memIncrements[1];
+ long chunkMetadataIncrement = memIncrements[2];
+
+ memTableIncrement += textDataIncrement;
+ storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
+ tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
+ SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+ workMemTable.releaseTVListRamCost(memTableIncrement);
+ workMemTable.releaseTextDataSize(textDataIncrement);
+ }
+
/**
* Delete data which belongs to the timeseries `deviceId.measurementId` and
the timestamp of which
* <= 'timestamp' in the deletion. <br>