This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 64ef149aacb finish memory estimation
64ef149aacb is described below
commit 64ef149aacbef5e3c9e8bfb336b8ab6aff71351b
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Wed Jun 19 15:25:41 2024 +0800
finish memory estimation
---
.../planner/plan/node/write/InsertTabletNode.java | 46 ++-
.../db/storageengine/dataregion/DataRegion.java | 311 +++++++++------------
.../dataregion/memtable/AbstractMemTable.java | 4 +-
.../memtable/AlignedWritableMemChunk.java | 2 +-
.../dataregion/memtable/IMemTable.java | 4 +-
.../dataregion/memtable/TsFileProcessor.java | 227 ++++++++++-----
.../wal/recover/file/TsFilePlanRedoer.java | 2 +-
.../java/org/apache/iotdb/db/utils/MemUtils.java | 7 +-
.../db/utils/datastructure/AlignedTVList.java | 19 +-
.../dataregion/memtable/PrimitiveMemTableTest.java | 2 +-
10 files changed, 348 insertions(+), 276 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index f0b0725af6e..d0c9f9c5aa6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
import java.util.Map.Entry;
-import java.util.function.Function;
import java.util.function.IntFunction;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -228,19 +227,19 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return splitByPartition(analysis, this::getTableDeviceID);
}
- private Map<IDeviceID, SplitInfo> collectSplitRanges(IntFunction<IDeviceID>
rowNumDeviceIdMapper) {
+ private Map<IDeviceID, PartitionSplitInfo>
collectSplitRanges(IntFunction<IDeviceID> rowNumDeviceIdMapper) {
long upperBoundOfTimePartition =
TimePartitionUtils.getTimePartitionUpperBound(times[0]);
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(times[0]);
int startLoc = 0; // included
IDeviceID currDeviceId = rowNumDeviceIdMapper.apply(0);
- Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap = new HashMap<>();
+ Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new HashMap<>();
for (int i = 1; i < times.length; i++) { // times are sorted in session
API.
IDeviceID nextDeviceId = rowNumDeviceIdMapper.apply(i);
if (times[i] >= upperBoundOfTimePartition ||
!currDeviceId.equals(nextDeviceId)) {
- final SplitInfo splitInfo =
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
- deviceID1 -> new SplitInfo());
+ final PartitionSplitInfo splitInfo =
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+ deviceID1 -> new PartitionSplitInfo());
// a new range.
splitInfo.ranges.add(startLoc); // included
splitInfo.ranges.add(i); // excluded
@@ -253,8 +252,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
- SplitInfo splitInfo = deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
- deviceID1 -> new SplitInfo());
+ PartitionSplitInfo splitInfo =
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+ deviceID1 -> new PartitionSplitInfo());
// the final range
splitInfo.ranges.add(startLoc); // included
splitInfo.ranges.add(times.length); // excluded
@@ -263,12 +262,12 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return deviceIDSplitInfoMap;
}
- public Map<TRegionReplicaSet, List<Integer>>
splitByReplicaSet(Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap, IAnalysis
analysis) {
+ public Map<TRegionReplicaSet, List<Integer>>
splitByReplicaSet(Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap,
IAnalysis analysis) {
Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
- for (Entry<IDeviceID, SplitInfo> entry : deviceIDSplitInfoMap.entrySet()) {
+ for (Entry<IDeviceID, PartitionSplitInfo> entry :
deviceIDSplitInfoMap.entrySet()) {
final IDeviceID deviceID = entry.getKey();
- final SplitInfo splitInfo = entry.getValue();
+ final PartitionSplitInfo splitInfo = entry.getValue();
final List<TRegionReplicaSet> replicaSets = analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
@@ -369,7 +368,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return Collections.emptyList();
}
- final Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap =
collectSplitRanges(rowNumDeviceIdMapper);
+ final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap =
collectSplitRanges(rowNumDeviceIdMapper);
final Map<TRegionReplicaSet, List<Integer>> splitMap = splitByReplicaSet(
deviceIDSplitInfoMap, analysis);
@@ -1205,10 +1204,33 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return deviceIDs[rowIdx];
}
- private class SplitInfo {
+ private static class PartitionSplitInfo {
// for each List in split, they are range1.start, range1.end,
range2.start, range2.end, ...
private List<Integer> ranges = new ArrayList<>();
private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
private List<TRegionReplicaSet> replicaSets;
}
+
+ /**
+ * Split the tablet of the given range according to Table deviceID.
+ * @param start inclusive
+ * @param end exclusive
+ * @return each the number in the pair is the end offset of a device
+ */
+ public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
+ List<Pair<IDeviceID, Integer>> result = new ArrayList<>();
+ IDeviceID prevDeviceId = getTableDeviceID(start);
+
+ int i = start + 1;
+ for (; i < end; i++) {
+ IDeviceID currentDeviceId = getTableDeviceID(i);
+ if (!currentDeviceId.equals(prevDeviceId)) {
+ result.add(new Pair<>(prevDeviceId, i));
+ prevDeviceId = getTableDeviceID(i);
+ }
+ }
+ result.add(new Pair<>(prevDeviceId, start));
+
+ return result;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 9bab11f9503..688cf838a50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -343,10 +343,10 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Construct a database processor.
*
- * @param systemDir system dir path
- * @param dataRegionId data region id e.g. 1
+ * @param systemDir system dir path
+ * @param dataRegionId data region id e.g. 1
* @param fileFlushPolicy file flush policy
- * @param databaseName database name e.g. root.sg1
+ * @param databaseName database name e.g. root.sg1
*/
public DataRegion(
String systemDir, String dataRegionId, TsFileFlushPolicy
fileFlushPolicy, String databaseName)
@@ -945,18 +945,7 @@ public class DataRegion implements IDataRegionForQuery {
}
// init map
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
-
- if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
- timePartitionId,
- true,
- Long.MAX_VALUE,
- 0));
- }
+ initFlushTimeMap(timePartitionId);
boolean isSequence =
config.isEnableSeparateData()
@@ -976,28 +965,100 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- public void insertTreeTablet(InsertTabletNode insertTabletNode,
- IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter)
+ public void insertTreeTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
final IDeviceID deviceID = insertTabletNode.getDeviceID();
-
insertTablet(insertTabletNode, i -> deviceID, i ->
config.isEnableSeparateData()
? lastFlushTimeMap.getFlushedTime(
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
insertTabletNode.getDeviceID())
- : Long.MAX_VALUE
+ : Long.MAX_VALUE,
+ false
+ );
+ }
+
+ public void insertTableTablet(InsertTabletNode insertTabletNode)
+ throws BatchProcessException, WriteProcessException {
+ insertTablet(insertTabletNode, insertTabletNode::getTableDeviceID, i ->
+ config.isEnableSeparateData()
+ ? lastFlushTimeMap.getFlushedTime(
+
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
+ insertTabletNode.getTableDeviceID(i))
+ : Long.MAX_VALUE,
+ true
);
}
+
+ private boolean splitAndInsert(InsertTabletNode insertTabletNode,
+ IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter, int loc,
+ TSStatus[] results)
+ throws BatchProcessException, WriteProcessException {
+ boolean noFailure = true;
+
+ // before is first start point
+ int before = loc;
+ long beforeTime = insertTabletNode.getTimes()[before];
+ // before time partition
+ long beforeTimePartition =
+ TimePartitionUtils.getTimePartitionId(beforeTime);
+ // init flush time map
+ initFlushTimeMap(beforeTimePartition);
+
+ // if is sequence
+ boolean isSequence = false;
+ while (loc < insertTabletNode.getRowCount()) {
+ long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
+ long time = insertTabletNode.getTimes()[loc];
+ final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
+ // always in some time partition
+ // judge if we should insert sequence
+ if (timePartitionId != beforeTimePartition) {
+ // a new partition, insert the remaining of the previous partition
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletNode, before, loc, isSequence, results,
+ beforeTimePartition, rowDeviceIdGetter, noFailure)
+ && noFailure;
+ before = loc;
+ beforeTimePartition = timePartitionId;
+ isSequence = time > lastFlushTime;
+ } else if (!isSequence && time > lastFlushTime) {
+ // the same partition and switch to sequence data
+ // insert previous range into unsequence
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletNode, before, loc, isSequence, results,
+ beforeTimePartition, rowDeviceIdGetter, noFailure)
+ && noFailure;
+ before = loc;
+ isSequence = true;
+ }
+ // else: the same partition and isSequence not changed, just move the
cursor forward
+ loc++;
+ }
+
+ // do not forget last part
+ if (before < loc) {
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletNode, before, loc, isSequence, results,
beforeTimePartition, rowDeviceIdGetter, noFailure)
+ && noFailure;
+ }
+
+ return noFailure;
+ }
+
/**
- * Insert a tablet (rows belonging to the same devices) into this database.
+ * Insert a tablet into this database.
*
* @throws BatchProcessException if some of the rows failed to be inserted
*/
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
- public void insertTablet(InsertTabletNode insertTabletNode,
- IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter)
+ private void insertTablet(InsertTabletNode insertTabletNode,
+ IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter,
+ boolean checkAllRowTtl)
throws BatchProcessException, WriteProcessException {
StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
@@ -1012,57 +1073,11 @@ public class DataRegion implements IDataRegionForQuery {
boolean noFailure;
int loc = checkTTL(insertTabletNode, results, i ->
DataNodeTTLCache.getInstance()
- .getTTL(rowDeviceIdGetter.apply(i)));
+ .getTTL(rowDeviceIdGetter.apply(i)), !checkAllRowTtl);
noFailure = loc != 0;
- // before is first start point
- int before = loc;
- long before
- // before time partition
- long beforeTimePartition =
-
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
- // init map
+ noFailure = noFailure & splitAndInsert(insertTabletNode,
rowDeviceIdGetter, rowLastFlushTimeGetter, loc, results);
- if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
- beforeTimePartition,
- true,
- Long.MAX_VALUE,
- 0));
- }
-
- // if is sequence
- boolean isSequence = false;
- while (loc < insertTabletNode.getRowCount()) {
- long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
- long time = insertTabletNode.getTimes()[loc];
- final long timePartitionId =
TimePartitionUtils.getTimePartitionId(time);
- // always in some time partition
- // judge if we should insert sequence
- if (!isSequence && time > lastFlushTime) {
- // insert into unsequence and then start sequence
- noFailure =
- insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, false, results,
- timePartitionId)
- && noFailure;
- before = loc;
- isSequence = true;
- }
- loc++;
- }
-
- // do not forget last part
- if (before < loc) {
- noFailure =
- insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, isSequence, results, time)
- && noFailure;
- }
startTime = System.nanoTime();
tryToUpdateInsertTabletLastCache(insertTabletNode);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
@@ -1075,8 +1090,22 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ private void initFlushTimeMap(long timePartitionId) {
+ if (config.isEnableSeparateData()
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ timePartitionId,
+ true,
+ Long.MAX_VALUE,
+ 0));
+ }
+ }
+
private int checkTTL(InsertTabletNode insertTabletNode, TSStatus[] results,
- IntToLongFunction rowTTLGetter)
+ IntToLongFunction rowTTLGetter, boolean breakOnFirstAlive)
throws OutOfTTLException {
/*
@@ -1084,6 +1113,7 @@ public class DataRegion implements IDataRegionForQuery {
*/
int loc = 0;
long ttl = 0;
+ int firstAliveLoc = -1;
while (loc < insertTabletNode.getRowCount()) {
ttl = rowTTLGetter.applyAsLong(loc);
long currTime = insertTabletNode.getTimes()[loc];
@@ -1097,102 +1127,24 @@ public class DataRegion implements IDataRegionForQuery
{
DateTimeUtils.convertLongToDate(currTime),
DateTimeUtils.convertLongToDate(
CommonDateTimeUtils.currentTime() - ttl)));
- loc++;
} else {
- break;
+ if (firstAliveLoc == -1) {
+ firstAliveLoc = loc;
+ }
+ if (breakOnFirstAlive) {
+ break;
+ }
}
+ loc++;
}
- // loc pointing at first legal position
- if (loc == insertTabletNode.getRowCount()) {
+
+ if (firstAliveLoc == -1) {
+ // no alive data
throw new OutOfTTLException(
insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 1],
(CommonDateTimeUtils.currentTime() - ttl));
}
- return loc;
- }
-
- /**
- * Insert a tablet (rows belonging to the same devices) into this database.
- *
- * @throws BatchProcessException if some of the rows failed to be inserted
- */
- @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
- public void insertTableTablet(InsertTabletNode insertTabletNode)
- throws BatchProcessException, WriteProcessException {
- StorageEngine.blockInsertionIfReject(null);
- long startTime = System.nanoTime();
- writeLock("insertTablet");
- PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
- try {
- if (deleted) {
- return;
- }
- TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
- Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
- boolean noFailure = true;
- int loc = checkTTL(insertTabletNode, results,
- i ->
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getTableDeviceID(i)));
-
- // before is first start point
- int before = loc;
- // before time partition
- long beforeTimePartition =
-
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
- // init map
-
- if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
- beforeTimePartition,
- true,
- Long.MAX_VALUE,
- 0));
- }
-
- // if is sequence
- boolean isSequence = false;
- while (loc < insertTabletNode.getRowCount()) {
- long lastFlushTime =
- config.isEnableSeparateData()
- ? lastFlushTimeMap.getFlushedTime(beforeTimePartition,
- insertTabletNode.getTableDeviceID(loc))
- : Long.MAX_VALUE;
-
- long time = insertTabletNode.getTimes()[loc];
- // always in some time partition
- // judge if we should insert sequence
- if (!isSequence && time > lastFlushTime) {
- // insert into unsequence and then start sequence
- noFailure =
- insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, false, results,
beforeTimePartition)
- && noFailure;
- before = loc;
- isSequence = true;
- }
- loc++;
- }
-
- // do not forget last part
- if (before < loc) {
- noFailure =
- insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, isSequence, results,
beforeTimePartition)
- && noFailure;
- }
- startTime = System.nanoTime();
- tryToUpdateInsertTabletLastCache(insertTabletNode);
-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
-
- if (!noFailure) {
- throw new BatchProcessException(results);
- }
- } finally {
- writeUnlock();
- }
+ return firstAliveLoc;
}
/**
@@ -1203,7 +1155,11 @@ public class DataRegion implements IDataRegionForQuery {
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
public void insertTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
-
+ if (insertTabletNode.isWriteToTable()) {
+ insertTableTablet(insertTabletNode);
+ } else {
+ insertTreeTablet(insertTabletNode);
+ }
}
/**
@@ -1221,11 +1177,11 @@ public class DataRegion implements IDataRegionForQuery {
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5,
null, 5}
*
* @param insertTabletNode insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in
insertTabletPlan
+ * @param end end index of rows to be inserted in
insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true
otherwise
*/
private boolean insertTabletToTsFileProcessor(
@@ -1234,7 +1190,9 @@ public class DataRegion implements IDataRegionForQuery {
int end,
boolean sequence,
TSStatus[] results,
- long timePartitionId) {
+ long timePartitionId,
+ IntFunction<IDeviceID> rowDeviceIdGetter,
+ boolean noFailure) {
// return when start >= end or all measurement failed
if (start >= end || insertTabletNode.allMeasurementFailed()) {
return true;
@@ -1252,7 +1210,7 @@ public class DataRegion implements IDataRegionForQuery {
}
try {
- tsFileProcessor.insertTablet(insertTabletNode, start, end, results);
+ tsFileProcessor.insertTablet(insertTabletNode, start, end, results,
rowDeviceIdGetter, noFailure);
} catch (WriteProcessRejectException e) {
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
return false;
@@ -1552,9 +1510,9 @@ public class DataRegion implements IDataRegionForQuery {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(
long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
boolean sequence)
@@ -1638,7 +1596,7 @@ public class DataRegion implements IDataRegionForQuery {
/**
* close one tsfile processor
*
- * @param sequence whether this tsfile processor is sequence or not
+ * @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
tsFileProcessor) {
@@ -1670,7 +1628,7 @@ public class DataRegion implements IDataRegionForQuery {
/**
* close one tsfile processor, thread-safety should be ensured by caller
*
- * @param sequence whether this tsfile processor is sequence or not
+ * @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence,
TsFileProcessor tsFileProcessor) {
@@ -2884,7 +2842,7 @@ public class DataRegion implements IDataRegionForQuery {
* <p>Then, update the latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
*
* @param newTsFileResource tsfile resource @UsedBy load external tsfile
module
- * @param deleteOriginFile whether to delete origin tsfile
+ * @param deleteOriginFile whether to delete origin tsfile
* @param isGeneratedByPipe whether the load tsfile request is generated by
pipe
*/
public void loadNewTsFile(
@@ -3071,6 +3029,7 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Update latest time in latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
+ *
* @UsedBy sync module, load external tsfile module.
*/
protected void updateLastFlushTime(TsFileResource newTsFileResource) {
@@ -3089,8 +3048,8 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Execute the loading process by the type.
*
- * @param tsFileResource tsfile resource to be loaded
- * @param filePartitionId the partition id of the new file
+ * @param tsFileResource tsfile resource to be loaded
+ * @param filePartitionId the partition id of the new file
* @param deleteOriginFile whether to delete the original file
* @return load the file successfully @UsedBy sync module, load external
tsfile module.
*/
@@ -3638,7 +3597,7 @@ public class DataRegion implements IDataRegionForQuery {
}
/**
- * @param folder the folder's path
+ * @param folder the folder's path
* @param diskSize the disk space occupied by this folder, unit is MB
*/
private void countFolderDiskSize(String folder, AtomicLong diskSize) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index a1d1047251e..263edad395b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedFullPath;
@@ -349,7 +350,8 @@ public abstract class AbstractMemTable implements IMemTable
{
}
@Override
- public void insertAlignedTablet(InsertTabletNode insertTabletNode, int
start, int end)
+ public void insertAlignedTablet(InsertTabletNode insertTabletNode, int
start, int end,
+ TSStatus[] results)
throws WriteProcessException {
try {
writeAlignedTablet(insertTabletNode, start, end);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index b93474fdb92..a219df17908 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -245,7 +245,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
return (long) list.rowCount() * measurementIndexMap.size();
}
- public long alignedListSize() {
+ public int alignedListSize() {
return list.rowCount();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index f998a76cf1a..4fbfb451302 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.PartialPath;
@@ -115,7 +116,8 @@ public interface IMemTable extends WALEntryValue {
void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
throws WriteProcessException;
- void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int
end)
+ void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int
end,
+ TSStatus[] results)
throws WriteProcessException;
ReadOnlyMemChunk query(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index c43078a24e5..08c0431132f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import java.util.function.IntFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -26,6 +27,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -116,6 +118,7 @@ public class TsFileProcessor {
/** Logger fot this class. */
private static final Logger logger =
LoggerFactory.getLogger(TsFileProcessor.class);
+ private static final int NUM_MEM_TO_ESTIMATE = 3;
/** Storage group name of this tsfile. */
private final String storageGroupName;
@@ -413,6 +416,79 @@ public class TsFileProcessor {
walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
}
+ private long[] checkMemCost(InsertTabletNode insertTabletNode, int start,
int end, TSStatus[] results, boolean noFailure)
+ throws WriteProcessException {
+ long[] memIncrements;
+ try {
+ long startTime = System.nanoTime();
+ if (insertTabletNode.isWriteToTable()) {
+ memIncrements = checkTreeMemCost(insertTabletNode, start, end,
noFailure, results);
+ } else {
+ memIncrements = checkTableMemCost(insertTabletNode, start, end,
noFailure, results);
+ }
+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() -
startTime);
+ } catch (WriteProcessException e) {
+ for (int i = start; i < end; i++) {
+ results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT,
e.getMessage());
+ }
+ throw new WriteProcessException(e);
+ }
+
+ return memIncrements;
+ }
+
+ private long[] checkTreeMemCost(InsertTabletNode insertTabletNode, int
start, int end,
+ boolean noFailure, TSStatus[] results)
+ throws WriteProcessException {
+ long[] memIncrements;
+ if (insertTabletNode.isAligned()) {
+ memIncrements =
+ checkAlignedMemCostAndAddToTspForTablet(
+ insertTabletNode.getDeviceID(),
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getColumns(),
+ insertTabletNode.getColumnCategories(),
+ start,
+ end,
+ noFailure, results);
+ } else {
+ memIncrements =
+ checkMemCostAndAddToTspInfoForTablet(
+ insertTabletNode.getDeviceID(),
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getColumns(),
+ start,
+ end);
+ }
+ return memIncrements;
+ }
+
+ private long[] checkTableMemCost(InsertTabletNode insertTabletNode, int
start, int end,
+ boolean noFailure, TSStatus[] results)
+ throws WriteProcessException {
+ List<Pair<IDeviceID, Integer>> deviceEndPosList =
insertTabletNode.splitByDevice(start, end);
+ long[] memIncrements = new long[NUM_MEM_TO_ESTIMATE];
+ int splitStart = start;
+ for (Pair<IDeviceID, Integer> iDeviceIDIntegerPair : deviceEndPosList) {
+ int splitEnd = iDeviceIDIntegerPair.getRight();
+ IDeviceID deviceID = iDeviceIDIntegerPair.getLeft();
+ long[] splitMemIncrements = checkAlignedMemCostAndAddToTspForTablet(
+ deviceID,
+ insertTabletNode.getMeasurements(),
+ insertTabletNode.getDataTypes(),
+ insertTabletNode.getColumns(),
+ insertTabletNode.getColumnCategories(), splitStart,
+ splitEnd, noFailure, results);
+ for (int i = 0; i < NUM_MEM_TO_ESTIMATE; i++) {
+ memIncrements[i] += splitMemIncrements[i];
+ }
+ splitStart = splitEnd;
+ }
+ return memIncrements;
+ }
+
/**
* Insert batch data of insertTabletPlan into the workingMemtable. The rows
to be inserted are in
* the range [start, end). Null value in each column values will be replaced
by the subsequent
@@ -424,7 +500,9 @@ public class TsFileProcessor {
* @param results result array
*/
public void insertTablet(
- InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results)
+ InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results,
+ IntFunction<IDeviceID> rowDeviceIdGetter,
+ boolean noFailure)
throws WriteProcessException {
if (workMemTable == null) {
@@ -435,35 +513,7 @@ public class TsFileProcessor {
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
}
- long[] memIncrements;
- try {
- long startTime = System.nanoTime();
- if (insertTabletNode.isAligned()) {
- memIncrements =
- checkAlignedMemCostAndAddToTspForTablet(
- insertTabletNode.getDeviceID(),
- insertTabletNode.getMeasurements(),
- insertTabletNode.getDataTypes(),
- insertTabletNode.getColumns(),
- start,
- end);
- } else {
- memIncrements =
- checkMemCostAndAddToTspInfoForTablet(
- insertTabletNode.getDeviceID(),
- insertTabletNode.getMeasurements(),
- insertTabletNode.getDataTypes(),
- insertTabletNode.getColumns(),
- start,
- end);
- }
-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() -
startTime);
- } catch (WriteProcessException e) {
- for (int i = start; i < end; i++) {
- results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT,
e.getMessage());
- }
- throw new WriteProcessException(e);
- }
+ long[] memIncrements = checkMemCost(insertTabletNode, start, end, results,
noFailure);
long startTime = System.nanoTime();
WALFlushListener walFlushListener;
@@ -497,7 +547,7 @@ public class TsFileProcessor {
try {
if (insertTabletNode.isAligned()) {
- workMemTable.insertAlignedTablet(insertTabletNode, start, end);
+ workMemTable.insertAlignedTablet(insertTabletNode, start, end,
noFailure ? null : results);
} else {
workMemTable.insertTablet(insertTabletNode, start, end);
}
@@ -626,7 +676,7 @@ public class TsFileProcessor {
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR)
* dataTypes.length;
- memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+ memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes,
null);
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
@@ -692,7 +742,7 @@ public class TsFileProcessor {
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR)
* dataTypes.length;
- memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+ memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypes, null);
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
@@ -787,15 +837,15 @@ public class TsFileProcessor {
String[] measurements,
TSDataType[] dataTypes,
Object[] columns,
- int start,
- int end)
+ TsTableColumnCategory[] columnCategories, int start,
+ int end, boolean noFailure, TSStatus[] results)
throws WriteProcessException {
if (start >= end) {
return new long[] {0, 0, 0};
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
- updateAlignedMemCost(dataTypes, deviceId, measurements, start, end,
memIncrements, columns);
+ updateAlignedMemCost(dataTypes, deviceId, measurements, start, end,
memIncrements, columns, columnCategories, noFailure, results);
long memTableIncrement = memIncrements[0];
long textDataIncrement = memIncrements[1];
long chunkMetadataIncrement = memIncrements[2];
@@ -837,7 +887,7 @@ public class TsFileProcessor {
// TEXT data size
if (dataType.isBinary()) {
Binary[] binColumn = (Binary[]) column;
- memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end);
+ memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end,
null);
}
}
@@ -848,72 +898,99 @@ public class TsFileProcessor {
int start,
int end,
long[] memIncrements,
- Object[] columns) {
+ Object[] columns, TsTableColumnCategory[] columnCategories, boolean
noFailure,
+ TSStatus[] results) {
+ int incomingPointNum;
+ if (noFailure) {
+ incomingPointNum = end - start;
+ } else {
+ incomingPointNum = end - start;
+ for (TSStatus result : results) {
+ if (result != null) {
+ incomingPointNum --;
+ }
+ }
+ }
+
+ int measurementColumnNum = 0;
+ if (columnCategories == null) {
+ measurementColumnNum = dataTypes.length;
+ } else {
+ for (TsTableColumnCategory columnCategory : columnCategories) {
+ if (columnCategory == TsTableColumnCategory.MEASUREMENT) {
+ measurementColumnNum++;
+ }
+ }
+ }
+
// memIncrements = [memTable, text, chunk metadata] respectively
if (workMemTable.checkIfChunkDoesNotExist(deviceId,
AlignedPath.VECTOR_PLACEHOLDER)) {
- // ChunkMetadataIncrement
+ // new devices introduce new ChunkMetadata
+ // ChunkMetadata memory Increment
memIncrements[2] +=
- dataTypes.length
+ measurementColumnNum
* ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR);
- memIncrements[0] +=
- ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
- * AlignedTVList.alignedTvListArrayMemCost(dataTypes);
- for (int i = 0; i < dataTypes.length; i++) {
- TSDataType dataType = dataTypes[i];
- String measurement = measurementIds[i];
- Object column = columns[i];
- if (dataType == null || column == null || measurement == null) {
- continue;
- }
- // TEXT data size
- if (dataType.isBinary()) {
- Binary[] binColumn = (Binary[]) columns[i];
- memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start,
end);
- }
- }
+ // TVList memory
+ int numArraysToAdd = incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE
+
+ incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+ memIncrements[0] +=
+ numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes,
columnCategories);
} else {
AlignedWritableMemChunk alignedMemChunk =
((AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId))
.getAlignedMemChunk();
List<TSDataType> dataTypesInTVList = new ArrayList<>();
+ int currentPointNum = alignedMemChunk.alignedListSize();
+ int newPointNum = currentPointNum + incomingPointNum;
for (int i = 0; i < dataTypes.length; i++) {
TSDataType dataType = dataTypes[i];
String measurement = measurementIds[i];
Object column = columns[i];
- if (dataType == null || column == null || measurement == null) {
+ if (dataType == null || column == null || measurement == null ||
+ (columnCategories != null && columnCategories[i] !=
TsTableColumnCategory.MEASUREMENT)) {
continue;
}
- // Extending the column of aligned mem chunk
+
if (!alignedMemChunk.containsMeasurement(measurementIds[i])) {
+ // add a new column in the TVList, the new column should be as long
as existing ones
memIncrements[0] +=
- (alignedMemChunk.alignedListSize() /
PrimitiveArrayManager.ARRAY_SIZE + 1)
+ (currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1)
* AlignedTVList.valueListArrayMemCost(dataType);
dataTypesInTVList.add(dataType);
}
- // TEXT data size
- if (dataType.isBinary()) {
- Binary[] binColumn = (Binary[]) columns[i];
- memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start,
end);
- }
- }
- long acquireArray;
- if (alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE
== 0) {
- acquireArray = (end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1L;
- } else {
- acquireArray =
- (end
- - start
- - 1
- + (alignedMemChunk.alignedListSize() %
PrimitiveArrayManager.ARRAY_SIZE))
- / PrimitiveArrayManager.ARRAY_SIZE;
}
+
+ // calculate how many new arrays will be added after this insertion
+ int currentArrayCnt = currentPointNum /
PrimitiveArrayManager.ARRAY_SIZE +
+ currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+ int newArrayCnt = newPointNum / PrimitiveArrayManager.ARRAY_SIZE +
+ newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+ long acquireArray = newArrayCnt - currentArrayCnt;
+
if (acquireArray != 0) {
+ // memory of extending the TVList
dataTypesInTVList.addAll(((AlignedTVList)
alignedMemChunk.getTVList()).getTsDataTypes());
memIncrements[0] +=
acquireArray *
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
}
}
+
+ // flexible-length data size
+ for (int i = 0; i < dataTypes.length; i++) {
+ TSDataType dataType = dataTypes[i];
+ String measurement = measurementIds[i];
+ Object column = columns[i];
+ if (dataType == null || column == null || measurement == null ||
+ (columnCategories != null && columnCategories[i] !=
TsTableColumnCategory.MEASUREMENT)) {
+ continue;
+ }
+
+ if (dataType.isBinary()) {
+ Binary[] binColumn = (Binary[]) columns[i];
+ memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start,
end, results);
+ }
+ }
}
private void updateMemoryInfo(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 162b1ac02b2..1340554af99 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -100,7 +100,7 @@ public class TsFilePlanRedoer {
} else {
if (node.isAligned()) {
recoveryMemTable.insertAlignedTablet(
- (InsertTabletNode) node, 0, ((InsertTabletNode)
node).getRowCount());
+ (InsertTabletNode) node, 0, ((InsertTabletNode)
node).getRowCount(), null);
} else {
recoveryMemTable.insertTablet(
(InsertTabletNode) node, 0, ((InsertTabletNode)
node).getRowCount());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 89af1e78427..2665efa8e68 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
@@ -95,11 +96,13 @@ public class MemUtils {
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER +
RamUsageEstimator.sizeOf(value.getValues());
}
- public static long getBinaryColumnSize(Binary[] column, int start, int end) {
+ public static long getBinaryColumnSize(Binary[] column, int start, int end,
TSStatus[] results) {
long memSize = 0;
memSize += (long) (end - start) *
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
for (int i = start; i < end; i++) {
- memSize += RamUsageEstimator.sizeOf(column[i].getValues());
+ if (results == null || results[i] == null) {
+ memSize += RamUsageEstimator.sizeOf(column[i].getValues());
+ }
}
return memSize;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 3aa1f204c14..8372468e3cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils.datastructure;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -851,18 +852,24 @@ public abstract class AlignedTVList extends TVList {
return TSDataType.VECTOR;
}
+
+
/**
* Get the single alignedTVList array mem cost by give types.
*
* @param types the types in the vector
* @return AlignedTvListArrayMemSize
*/
- public static long alignedTvListArrayMemCost(TSDataType[] types) {
+ public static long alignedTvListArrayMemCost(TSDataType[] types,
TsTableColumnCategory[] columnCategories) {
+
+ int measurementColumnNum = 0;
long size = 0;
// value array mem size
- for (TSDataType type : types) {
- if (type != null) {
- size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long)
type.getDataTypeSize();
+ for (int i = 0; i < types.length; i++) {
+ TSDataType type = types[i];
+ if (type != null || columnCategories != null || columnCategories[i] ==
TsTableColumnCategory.MEASUREMENT) {
+ size += (long) ARRAY_SIZE * (long) type.getDataTypeSize();
+ measurementColumnNum++;
}
}
// size is 0 when all types are null
@@ -874,9 +881,9 @@ public abstract class AlignedTVList extends TVList {
// index array mem size
size += PrimitiveArrayManager.ARRAY_SIZE * 4L;
// array headers mem size
- size += (long) NUM_BYTES_ARRAY_HEADER * (2 + types.length);
+ size += (long) NUM_BYTES_ARRAY_HEADER * (2 + measurementColumnNum);
// Object references size in ArrayList
- size += (long) NUM_BYTES_OBJECT_REF * (2 + types.length);
+ size += (long) NUM_BYTES_OBJECT_REF * (2 + measurementColumnNum);
return size;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index 5dece7fd940..451a0ad364e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -376,7 +376,7 @@ public class PrimitiveMemTableTest {
private void writeVector(IMemTable memTable)
throws IOException, QueryProcessException, MetadataException,
WriteProcessException {
- memTable.insertAlignedTablet(genInsertTableNode(), 0, 100);
+ memTable.insertAlignedTablet(genInsertTableNode(), 0, 100, null);
IDeviceID tmpDeviceId =
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.device5");