This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch cp_insertRows_mem133 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d82800c0cf7c8c0a709bb493d1066cd37c1ad5c7 Author: Haonan <[email protected]> AuthorDate: Sat Sep 14 21:45:22 2024 +0800 Fix memory calculate error when insertRecords with both aligned and non-aligned devices (#12720) --- .../planner/plan/node/write/InsertRowsNode.java | 10 ++ .../db/storageengine/dataregion/DataRegion.java | 3 + .../memtable/AlignedWritableMemChunk.java | 2 +- .../dataregion/memtable/TsFileProcessor.java | 59 +++++++--- .../dataregion/memtable/TsFileProcessorTest.java | 124 ++++++++++++++++++++- 5 files changed, 180 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index b6f16b7d70a..13758064128 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java @@ -65,6 +65,8 @@ public class InsertRowsNode extends InsertNode implements WALEntryValue { /** The {@link InsertRowNode} list */ private List<InsertRowNode> insertRowNodeList; + private boolean isMixingAlignment = false; + public InsertRowsNode(PlanNodeId id) { super(id); insertRowNodeList = new ArrayList<>(); @@ -102,6 +104,14 @@ public class InsertRowsNode extends InsertNode implements WALEntryValue { insertRowNodeIndexList.add(index); } + public boolean isMixingAlignment() { + return isMixingAlignment; + } + + public void setMixingAlignment(boolean mixingAlignment) { + isMixingAlignment = mixingAlignment; + } + @Override public void setSearchIndex(long index) { searchIndex = index; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 0ecf7f7c81c..90b603d0b06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1344,6 +1344,9 @@ public class DataRegion implements IDataRegionForQuery { v.markAsGeneratedByRemoteConsensusLeader(); } } + if (v.isAligned() != insertRowNode.isAligned()) { + v.setMixingAlignment(true); + } v.addOneInsertRowNode(insertRowNode, finalI); v.updateProgressIndex(insertRowNode.getProgressIndex()); return v; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 77b98b8b70e..d5d9bdce4ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -244,7 +244,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { return (long) list.rowCount() * measurementIndexMap.size(); } - public long alignedListSize() { + public int alignedListSize() { return list.rowCount(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index bc69d29d10d..f39cb9f0bb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -355,10 +355,29 @@ public class TsFileProcessor { long[] memIncrements; long memControlStartTime = System.nanoTime(); - if (insertRowsNode.isAligned()) { - memIncrements = checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode); + if (insertRowsNode.isMixingAlignment()) { + List<InsertRowNode> alignedList = new ArrayList<>(); + List<InsertRowNode> nonAlignedList = new ArrayList<>(); + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + if (insertRowNode.isAligned()) { + alignedList.add(insertRowNode); + } else { + nonAlignedList.add(insertRowNode); + } + } + long[] alignedMemIncrements = checkAlignedMemCostAndAddToTspInfoForRows(alignedList); + long[] nonAlignedMemIncrements = checkMemCostAndAddToTspInfoForRows(nonAlignedList); + memIncrements = new long[3]; + for (int i = 0; i < 3; i++) { + memIncrements[i] = alignedMemIncrements[i] + nonAlignedMemIncrements[i]; + } } else { - memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode); + if (insertRowsNode.isAligned()) { + memIncrements = + checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList()); + } else { + memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList()); + } } // recordScheduleMemoryBlockCost costsForMetrics[1] += System.nanoTime() - memControlStartTime; @@ -573,7 +592,7 @@ public class TsFileProcessor { } @SuppressWarnings("squid:S3776") // High Cognitive Complexity - private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode) + private long[] checkMemCostAndAddToTspInfoForRows(List<InsertRowNode> insertRowNodeList) throws WriteProcessException { // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 long memTableIncrement = 0L; @@ -581,7 +600,7 @@ public class TsFileProcessor { long chunkMetadataIncrement = 0L; // device -> measurement -> adding TVList size Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new HashMap<>(); - for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + for (InsertRowNode insertRowNode : insertRowNodeList) { IDeviceID deviceId = insertRowNode.getDeviceID(); TSDataType[] dataTypes = insertRowNode.getDataTypes(); Object[] values = insertRowNode.getValues(); @@ -684,7 +703,7 @@ public class TsFileProcessor { } @SuppressWarnings("squid:S3776") // high Cognitive Complexity - private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode) + private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode> insertRowNodeList) throws WriteProcessException { // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 long memTableIncrement = 0L; @@ -692,7 +711,7 @@ public class TsFileProcessor { long chunkMetadataIncrement = 0L; // device -> (measurements -> datatype, adding aligned TVList size) Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableInfo = new HashMap<>(); - for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + for (InsertRowNode insertRowNode : insertRowNodeList) { IDeviceID deviceId = insertRowNode.getDeviceID(); TSDataType[] dataTypes = insertRowNode.getDataTypes(); Object[] values = insertRowNode.getValues(); @@ -726,7 +745,7 @@ public class TsFileProcessor { (AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId); AlignedWritableMemChunk alignedMemChunk = memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk(); - long currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize(); + int currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize(); List<TSDataType> dataTypesInTVList = new ArrayList<>(); Pair<Map<String, TSDataType>, Integer> addingPointNumInfo = increasingMemTableInfo.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 0)); @@ -738,29 +757,37 @@ public class TsFileProcessor { int addingPointNum = addingPointNumInfo.getRight(); // Extending the column of aligned mem chunk - if ((alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) - && !increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) { + boolean currentMemChunkContainsMeasurement = + alignedMemChunk != null && alignedMemChunk.containsMeasurement(measurements[i]); + if (!currentMemChunkContainsMeasurement + && !addingPointNumInfo.left.containsKey(measurements[i])) { + addingPointNumInfo.left.put(measurements[i], dataTypes[i]); + int currentArrayNum = + (currentChunkPointNum + addingPointNum) / PrimitiveArrayManager.ARRAY_SIZE + + ((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE + > 0 + ? 1 + : 0); memTableIncrement += - ((currentChunkPointNum + addingPointNum) / PrimitiveArrayManager.ARRAY_SIZE + 1) - * AlignedTVList.valueListArrayMemCost(dataTypes[i]); - increasingMemTableInfo.get(deviceId).left.put(measurements[i], dataTypes[i]); + currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]); + addingPointNumInfo.left.put(measurements[i], dataTypes[i]); } // TEXT data mem size if (dataTypes[i].isBinary() && values[i] != null) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } - int addingPointNum = increasingMemTableInfo.get(deviceId).getRight(); + int addingPointNum = addingPointNumInfo.right; // Here currentChunkPointNum + addingPointNum >= 1 if (((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0) { if (alignedMemChunk != null) { dataTypesInTVList.addAll( ((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes()); } - dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values()); + dataTypesInTVList.addAll(addingPointNumInfo.left.values()); memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList); } - increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1); + addingPointNumInfo.setRight(addingPointNum + 1); } } updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index 50b2a5d3ea6..89266fa632f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -587,11 +587,11 @@ public class TsFileProcessorTest { // insert more rows by insertRows insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + insertRowsNode.setAligned(true); insertRowsNode.addOneInsertRowNode(insertRowNode1, 0); insertRowsNode.addOneInsertRowNode(insertRowNode2, 1); insertRowsNode.addOneInsertRowNode(insertRowNode3, 2); insertRowsNode.addOneInsertRowNode(insertRowNode4, 3); - insertRowsNode.setAligned(true); processor2.insert(insertRowsNode, new long[4]); Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); @@ -613,6 +613,128 @@ public class TsFileProcessorTest { Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); } + @Test + public void testRamCostInsertSameDataBy2Ways() + throws MetadataException, WriteProcessException, IOException { + TsFileProcessor processor1 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo); + processor1.setTsFileProcessorInfo(tsFileProcessorInfo1); + this.sgInfo.initTsFileProcessorInfo(processor1); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1); + // insert 100 rows (50 aligned, 50 non-aligned) by insertRow + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + if (i <= 50) { + node.setAligned(true); + } + processor1.insert(node, new long[4]); + } + IMemTable memTable1 = processor1.getWorkMemTable(); + + TsFileProcessor processor2 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo); + processor2.setTsFileProcessorInfo(tsFileProcessorInfo2); + this.sgInfo.initTsFileProcessorInfo(processor2); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2); + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + insertRowsNode.setAligned(true); + // insert 100 rows (50 aligned, 50 non-aligned) by insertRows + insertRowsNode.setMixingAlignment(true); + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + if (i <= 50) { + node.setAligned(true); + } + insertRowsNode.addOneInsertRowNode(node, i - 1); + } + processor2.insert(insertRowsNode, new long[4]); + IMemTable memTable2 = processor2.getWorkMemTable(); + + Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); + Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); + Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); + } + + @Test + public void testRamCostInsertSameDataBy2Ways2() + throws MetadataException, WriteProcessException, IOException { + TsFileProcessor processor1 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo); + processor1.setTsFileProcessorInfo(tsFileProcessorInfo1); + this.sgInfo.initTsFileProcessorInfo(processor1); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1); + // insert 100 rows (50 aligned, 50 non-aligned) by insertRow + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, "s" + i, String.valueOf(i))); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + node.setAligned(true); + if (i <= 50) { + node.setAligned(true); + } + processor1.insert(node, new long[4]); + } + IMemTable memTable1 = processor1.getWorkMemTable(); + + TsFileProcessor processor2 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo); + processor2.setTsFileProcessorInfo(tsFileProcessorInfo2); + this.sgInfo.initTsFileProcessorInfo(processor2); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2); + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + insertRowsNode.setAligned(true); + // insert 100 rows (50 aligned, 50 non-aligned) by insertRows + insertRowsNode.setMixingAlignment(true); + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, "s" + i, String.valueOf(i))); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + node.setAligned(true); + if (i <= 50) { + node.setAligned(true); + } + insertRowsNode.addOneInsertRowNode(node, i - 1); + } + processor2.insert(insertRowsNode, new long[4]); + IMemTable memTable2 = processor2.getWorkMemTable(); + + Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); + Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); + Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); + } + @Test public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin..");
