This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 3ad77ac892c Fix memory calculate error when insertRecords with both
aligned and non-aligned devices (#12720) (#13527)
3ad77ac892c is described below
commit 3ad77ac892c5daf66b5b98ba9785a91bdbe711f5
Author: Haonan <[email protected]>
AuthorDate: Wed Sep 18 09:30:55 2024 +0800
Fix memory calculate error when insertRecords with both aligned and
non-aligned devices (#12720) (#13527)
---
.../planner/plan/node/write/InsertRowsNode.java | 10 ++
.../db/storageengine/dataregion/DataRegion.java | 3 +
.../memtable/AlignedWritableMemChunk.java | 2 +-
.../dataregion/memtable/TsFileProcessor.java | 59 +++++++---
.../dataregion/memtable/TsFileProcessorTest.java | 124 ++++++++++++++++++++-
5 files changed, 180 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 2e86672aaa8..f5cd98c8f95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -65,6 +65,8 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
/** The {@link InsertRowNode} list */
private List<InsertRowNode> insertRowNodeList;
+ private boolean isMixingAlignment = false;
+
public InsertRowsNode(PlanNodeId id) {
super(id);
insertRowNodeList = new ArrayList<>();
@@ -102,6 +104,14 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
insertRowNodeIndexList.add(index);
}
+ public boolean isMixingAlignment() {
+ return isMixingAlignment;
+ }
+
+ public void setMixingAlignment(boolean mixingAlignment) {
+ isMixingAlignment = mixingAlignment;
+ }
+
@Override
public void setSearchIndex(long index) {
searchIndex = index;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0ecf7f7c81c..90b603d0b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1344,6 +1344,9 @@ public class DataRegion implements IDataRegionForQuery {
v.markAsGeneratedByRemoteConsensusLeader();
}
}
+ if (v.isAligned() != insertRowNode.isAligned()) {
+ v.setMixingAlignment(true);
+ }
v.addOneInsertRowNode(insertRowNode, finalI);
v.updateProgressIndex(insertRowNode.getProgressIndex());
return v;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 77b98b8b70e..d5d9bdce4ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -244,7 +244,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
return (long) list.rowCount() * measurementIndexMap.size();
}
- public long alignedListSize() {
+ public int alignedListSize() {
return list.rowCount();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index bc69d29d10d..f39cb9f0bb0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -355,10 +355,29 @@ public class TsFileProcessor {
long[] memIncrements;
long memControlStartTime = System.nanoTime();
- if (insertRowsNode.isAligned()) {
- memIncrements =
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
+ if (insertRowsNode.isMixingAlignment()) {
+ List<InsertRowNode> alignedList = new ArrayList<>();
+ List<InsertRowNode> nonAlignedList = new ArrayList<>();
+ for (InsertRowNode insertRowNode :
insertRowsNode.getInsertRowNodeList()) {
+ if (insertRowNode.isAligned()) {
+ alignedList.add(insertRowNode);
+ } else {
+ nonAlignedList.add(insertRowNode);
+ }
+ }
+ long[] alignedMemIncrements =
checkAlignedMemCostAndAddToTspInfoForRows(alignedList);
+ long[] nonAlignedMemIncrements =
checkMemCostAndAddToTspInfoForRows(nonAlignedList);
+ memIncrements = new long[3];
+ for (int i = 0; i < 3; i++) {
+ memIncrements[i] = alignedMemIncrements[i] +
nonAlignedMemIncrements[i];
+ }
} else {
- memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
+ if (insertRowsNode.isAligned()) {
+ memIncrements =
+
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList());
+ } else {
+ memIncrements =
checkMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList());
+ }
}
// recordScheduleMemoryBlockCost
costsForMetrics[1] += System.nanoTime() - memControlStartTime;
@@ -573,7 +592,7 @@ public class TsFileProcessor {
}
@SuppressWarnings("squid:S3776") // High Cognitive Complexity
- private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode
insertRowsNode)
+ private long[] checkMemCostAndAddToTspInfoForRows(List<InsertRowNode>
insertRowNodeList)
throws WriteProcessException {
// Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
long memTableIncrement = 0L;
@@ -581,7 +600,7 @@ public class TsFileProcessor {
long chunkMetadataIncrement = 0L;
// device -> measurement -> adding TVList size
Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new
HashMap<>();
- for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
IDeviceID deviceId = insertRowNode.getDeviceID();
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
@@ -684,7 +703,7 @@ public class TsFileProcessor {
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
- private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode
insertRowsNode)
+ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode>
insertRowNodeList)
throws WriteProcessException {
// Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
long memTableIncrement = 0L;
@@ -692,7 +711,7 @@ public class TsFileProcessor {
long chunkMetadataIncrement = 0L;
// device -> (measurements -> datatype, adding aligned TVList size)
Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>>
increasingMemTableInfo = new HashMap<>();
- for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
IDeviceID deviceId = insertRowNode.getDeviceID();
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
@@ -726,7 +745,7 @@ public class TsFileProcessor {
(AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId);
AlignedWritableMemChunk alignedMemChunk =
memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
- long currentChunkPointNum = alignedMemChunk == null ? 0 :
alignedMemChunk.alignedListSize();
+ int currentChunkPointNum = alignedMemChunk == null ? 0 :
alignedMemChunk.alignedListSize();
List<TSDataType> dataTypesInTVList = new ArrayList<>();
Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
increasingMemTableInfo.computeIfAbsent(deviceId, k -> new
Pair<>(new HashMap<>(), 0));
@@ -738,29 +757,37 @@ public class TsFileProcessor {
int addingPointNum = addingPointNumInfo.getRight();
// Extending the column of aligned mem chunk
- if ((alignedMemChunk != null &&
!alignedMemChunk.containsMeasurement(measurements[i]))
- &&
!increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) {
+ boolean currentMemChunkContainsMeasurement =
+ alignedMemChunk != null &&
alignedMemChunk.containsMeasurement(measurements[i]);
+ if (!currentMemChunkContainsMeasurement
+ && !addingPointNumInfo.left.containsKey(measurements[i])) {
+ addingPointNumInfo.left.put(measurements[i], dataTypes[i]);
+ int currentArrayNum =
+ (currentChunkPointNum + addingPointNum) /
PrimitiveArrayManager.ARRAY_SIZE
+ + ((currentChunkPointNum + addingPointNum) %
PrimitiveArrayManager.ARRAY_SIZE
+ > 0
+ ? 1
+ : 0);
memTableIncrement +=
- ((currentChunkPointNum + addingPointNum) /
PrimitiveArrayManager.ARRAY_SIZE + 1)
- * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
- increasingMemTableInfo.get(deviceId).left.put(measurements[i],
dataTypes[i]);
+ currentArrayNum *
AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+ addingPointNumInfo.left.put(measurements[i], dataTypes[i]);
}
// TEXT data mem size
if (dataTypes[i].isBinary() && values[i] != null) {
textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
}
}
- int addingPointNum = increasingMemTableInfo.get(deviceId).getRight();
+ int addingPointNum = addingPointNumInfo.right;
// Here currentChunkPointNum + addingPointNum >= 1
if (((currentChunkPointNum + addingPointNum) %
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
if (alignedMemChunk != null) {
dataTypesInTVList.addAll(
((AlignedTVList)
alignedMemChunk.getTVList()).getTsDataTypes());
}
-
dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values());
+ dataTypesInTVList.addAll(addingPointNumInfo.left.values());
memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
}
- increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1);
+ addingPointNumInfo.setRight(addingPointNum + 1);
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index 50b2a5d3ea6..89266fa632f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -587,11 +587,11 @@ public class TsFileProcessorTest {
// insert more rows by insertRows
insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowsNode.setAligned(true);
insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
- insertRowsNode.setAligned(true);
processor2.insert(insertRowsNode, new long[4]);
Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
@@ -613,6 +613,128 @@ public class TsFileProcessorTest {
Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
}
+ @Test
+ public void testRamCostInsertSameDataBy2Ways()
+ throws MetadataException, WriteProcessException, IOException {
+ TsFileProcessor processor1 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+ processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+ this.sgInfo.initTsFileProcessorInfo(processor1);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+ // insert 100 rows (50 aligned, 50 non-aligned) by insertRow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, i <= 50 ? deviceId :
"root.vehicle.d2");
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
+ InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+ if (i <= 50) {
+ node.setAligned(true);
+ }
+ processor1.insert(node, new long[4]);
+ }
+ IMemTable memTable1 = processor1.getWorkMemTable();
+
+ TsFileProcessor processor2 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+ processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+ this.sgInfo.initTsFileProcessorInfo(processor2);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+ InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowsNode.setAligned(true);
+ // insert 100 rows (50 aligned, 50 non-aligned) by insertRows
+ insertRowsNode.setMixingAlignment(true);
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, i <= 50 ? deviceId :
"root.vehicle.d2");
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
+ InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+ if (i <= 50) {
+ node.setAligned(true);
+ }
+ insertRowsNode.addOneInsertRowNode(node, i - 1);
+ }
+ processor2.insert(insertRowsNode, new long[4]);
+ IMemTable memTable2 = processor2.getWorkMemTable();
+
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+ }
+
+ @Test
+ public void testRamCostInsertSameDataBy2Ways2()
+ throws MetadataException, WriteProcessException, IOException {
+ TsFileProcessor processor1 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+ processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+ this.sgInfo.initTsFileProcessorInfo(processor1);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+ // insert 100 rows (50 aligned, 50 non-aligned) by insertRow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, i <= 50 ? deviceId :
"root.vehicle.d2");
+ record.addTuple(DataPoint.getDataPoint(dataType, "s" + i,
String.valueOf(i)));
+ InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+ node.setAligned(true);
+ if (i <= 50) {
+ node.setAligned(true);
+ }
+ processor1.insert(node, new long[4]);
+ }
+ IMemTable memTable1 = processor1.getWorkMemTable();
+
+ TsFileProcessor processor2 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+ processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+ this.sgInfo.initTsFileProcessorInfo(processor2);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+ InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowsNode.setAligned(true);
+ // insert 100 rows (50 aligned, 50 non-aligned) by insertRows
+ insertRowsNode.setMixingAlignment(true);
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, i <= 50 ? deviceId :
"root.vehicle.d2");
+ record.addTuple(DataPoint.getDataPoint(dataType, "s" + i,
String.valueOf(i)));
+ InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+ node.setAligned(true);
+ if (i <= 50) {
+ node.setAligned(true);
+ }
+ insertRowsNode.addOneInsertRowNode(node, i - 1);
+ }
+ processor2.insert(insertRowsNode, new long[4]);
+ IMemTable memTable2 = processor2.getWorkMemTable();
+
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+ }
+
@Test
public void testWriteAndClose() throws IOException, WriteProcessException,
MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");