This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_mem_control_insertRows3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 459db93fa925af74bc189bdf59729be02406d988 Author: HTHou <[email protected]> AuthorDate: Wed Jun 12 16:04:55 2024 +0800 Fix memory calculate error when insertRecords with both aligned and non-aligned devices --- .../db/storageengine/dataregion/DataRegion.java | 2 - .../dataregion/memtable/TsFileProcessor.java | 264 +++++++++++---------- .../dataregion/memtable/TsFileProcessorTest.java | 59 +++++ 3 files changed, 201 insertions(+), 124 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 5a58b3e1dbf..42a94f1bce3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1198,7 +1198,6 @@ public class DataRegion implements IDataRegionForQuery { if (v == null) { v = new InsertRowsNode(insertRowsNode.getPlanNodeId()); v.setSearchIndex(insertRowNode.getSearchIndex()); - v.setAligned(insertRowNode.isAligned()); } v.addOneInsertRowNode(insertRowNode, finalI); return v; @@ -3278,7 +3277,6 @@ public class DataRegion implements IDataRegionForQuery { if (v == null) { v = new InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId()); v.setSearchIndex(insertRowNode.getSearchIndex()); - v.setAligned(insertRowNode.isAligned()); } v.addOneInsertRowNode(insertRowNode, finalI); return v; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 74aed59ab88..86c479f1737 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -345,11 +345,7 @@ public class TsFileProcessor { long[] memIncrements; long memControlStartTime = System.nanoTime(); - if (insertRowsNode.isAligned()) { - memIncrements = checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode); - } else { - memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode); - } + memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode); // recordScheduleMemoryBlockCost costsForMetrics[1] += System.nanoTime() - memControlStartTime; @@ -561,55 +557,163 @@ public class TsFileProcessor { return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; } - @SuppressWarnings("squid:S3776") // High Cognitive Complexity private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode) throws WriteProcessException { - // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 - long memTableIncrement = 0L; - long textDataIncrement = 0L; - long chunkMetadataIncrement = 0L; + + long[] memIncrements = new long[3]; // device -> measurement -> adding TVList size - Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new HashMap<>(); + Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = new HashMap<>(); + // device -> (measurements -> datatype, adding aligned TVList size) + Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableInfoForAligned = + new HashMap<>(); for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { - IDeviceID deviceId = insertRowNode.getDeviceID(); - TSDataType[] dataTypes = insertRowNode.getDataTypes(); - Object[] values = insertRowNode.getValues(); - String[] measurements = insertRowNode.getMeasurements(); + if (insertRowNode.isAligned()) { + handleAlignedData(insertRowNode, memIncrements, increasingMemTableInfoForAligned); + } else { + handleUnalignedData(insertRowNode, memIncrements, increasingMemTableInfoForNonAligned); + } + } + updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]); + return memIncrements; + } + + @SuppressWarnings("squid:S3776") // High Cognitive Complexity + private void handleAlignedData( + InsertRowNode insertRowNode, + long[] memIncrements, + Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableInfoForAligned) { + long memTableIncrement = memIncrements[0]; + long textDataIncrement = memIncrements[1]; + long chunkMetadataIncrement = memIncrements[2]; + + IDeviceID deviceId = insertRowNode.getDeviceID(); + TSDataType[] dataTypes = insertRowNode.getDataTypes(); + Object[] values = insertRowNode.getValues(); + String[] measurements = insertRowNode.getMeasurements(); + + if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER) + && !increasingMemTableInfoForAligned.containsKey(deviceId)) { + // For new device of this mem table + // ChunkMetadataIncrement + chunkMetadataIncrement += + ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) + * dataTypes.length; + memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements if (dataTypes[i] == null || measurements[i] == null) { continue; } - if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i]) - && (!increasingMemTableInfo.containsKey(deviceId) - || !increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) { - // ChunkMetadataIncrement - chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]); - memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]); - increasingMemTableInfo - .computeIfAbsent(deviceId, k -> new HashMap<>()) - .putIfAbsent(measurements[i], 1); - } else { - // here currentChunkPointNum >= 1 - long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceId, measurements[i]); - int addingPointNum = - increasingMemTableInfo - .computeIfAbsent(deviceId, k -> new HashMap<>()) - .computeIfAbsent(measurements[i], k -> 0); + increasingMemTableInfoForAligned + .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1)) + .left + .put(measurements[i], dataTypes[i]); + // TEXT data mem size + if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { + textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); + } + } + + } else { + // For existed device of this mem table + AlignedWritableMemChunkGroup memChunkGroup = + (AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId); + AlignedWritableMemChunk alignedMemChunk = + memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk(); + long currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize(); + List<TSDataType> dataTypesInTVList = new ArrayList<>(); + Pair<Map<String, TSDataType>, Integer> addingPointNumInfo = + increasingMemTableInfoForAligned.computeIfAbsent( + deviceId, k -> new Pair<>(new HashMap<>(), 0)); + for (int i = 0; i < dataTypes.length; i++) { + // Skip failed Measurements + if (dataTypes[i] == null || measurements[i] == null) { + continue; + } + + int addingPointNum = addingPointNumInfo.getRight(); + // Extending the column of aligned mem chunk + if ((alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) + && !increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i])) { memTableIncrement += - ((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0 - ? TVList.tvListArrayMemCost(dataTypes[i]) - : 0; - increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) -> v + 1); + ((currentChunkPointNum + addingPointNum) / PrimitiveArrayManager.ARRAY_SIZE + 1) + * AlignedTVList.valueListArrayMemCost(dataTypes[i]); + increasingMemTableInfoForAligned.get(deviceId).left.put(measurements[i], dataTypes[i]); } // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { + if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } + int addingPointNum = increasingMemTableInfoForAligned.get(deviceId).getRight(); + // Here currentChunkPointNum + addingPointNum >= 1 + if (((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0) { + if (alignedMemChunk != null) { + dataTypesInTVList.addAll(((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes()); + } + dataTypesInTVList.addAll(increasingMemTableInfoForAligned.get(deviceId).getLeft().values()); + memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList); + } + increasingMemTableInfoForAligned.get(deviceId).setRight(addingPointNum + 1); } - updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); - return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; + + memIncrements[0] = memTableIncrement; + memIncrements[1] = textDataIncrement; + memIncrements[2] = chunkMetadataIncrement; + } + + @SuppressWarnings("squid:S3776") // High Cognitive Complexity + private void handleUnalignedData( + InsertRowNode insertRowNode, + long[] memIncrements, + Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned) { + + long memTableIncrement = memIncrements[0]; + long textDataIncrement = memIncrements[1]; + long chunkMetadataIncrement = memIncrements[2]; + + IDeviceID deviceId = insertRowNode.getDeviceID(); + TSDataType[] dataTypes = insertRowNode.getDataTypes(); + Object[] values = insertRowNode.getValues(); + String[] measurements = insertRowNode.getMeasurements(); + + for (int i = 0; i < dataTypes.length; i++) { + // Skip failed Measurements + if (dataTypes[i] == null || measurements[i] == null) { + continue; + } + if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i]) + && (!increasingMemTableInfoForNonAligned.containsKey(deviceId) + || !increasingMemTableInfoForNonAligned.get(deviceId).containsKey(measurements[i]))) { + // ChunkMetadataIncrement + chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]); + memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]); + increasingMemTableInfoForNonAligned + .computeIfAbsent(deviceId, k -> new HashMap<>()) + .putIfAbsent(measurements[i], 1); + } else { + // here currentChunkPointNum >= 1 + long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceId, measurements[i]); + int addingPointNum = + increasingMemTableInfoForNonAligned + .computeIfAbsent(deviceId, k -> new HashMap<>()) + .computeIfAbsent(measurements[i], k -> 0); + memTableIncrement += + ((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0 + ? TVList.tvListArrayMemCost(dataTypes[i]) + : 0; + increasingMemTableInfoForNonAligned + .get(deviceId) + .computeIfPresent(measurements[i], (k, v) -> v + 1); + } + // TEXT data mem size + if (dataTypes[i].isBinary() && values[i] != null) { + textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); + } + } + memIncrements[0] = memTableIncrement; + memIncrements[1] = textDataIncrement; + memIncrements[2] = chunkMetadataIncrement; } @SuppressWarnings("squid:S3776") // high Cognitive Complexity @@ -672,90 +776,6 @@ public class TsFileProcessor { return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; } - @SuppressWarnings("squid:S3776") // high Cognitive Complexity - private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode) - throws WriteProcessException { - // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 - long memTableIncrement = 0L; - long textDataIncrement = 0L; - long chunkMetadataIncrement = 0L; - // device -> (measurements -> datatype, adding aligned TVList size) - Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableInfo = new HashMap<>(); - for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { - IDeviceID deviceId = insertRowNode.getDeviceID(); - TSDataType[] dataTypes = insertRowNode.getDataTypes(); - Object[] values = insertRowNode.getValues(); - String[] measurements = insertRowNode.getMeasurements(); - if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER) - && !increasingMemTableInfo.containsKey(deviceId)) { - // For new device of this mem table - // ChunkMetadataIncrement - chunkMetadataIncrement += - ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) - * dataTypes.length; - memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes); - for (int i = 0; i < dataTypes.length; i++) { - // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { - continue; - } - increasingMemTableInfo - .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1)) - .left - .put(measurements[i], dataTypes[i]); - // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { - textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); - } - } - - } else { - // For existed device of this mem table - AlignedWritableMemChunkGroup memChunkGroup = - (AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId); - AlignedWritableMemChunk alignedMemChunk = - memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk(); - long currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize(); - List<TSDataType> dataTypesInTVList = new ArrayList<>(); - Pair<Map<String, TSDataType>, Integer> addingPointNumInfo = - increasingMemTableInfo.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 0)); - for (int i = 0; i < dataTypes.length; i++) { - // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { - continue; - } - - int addingPointNum = addingPointNumInfo.getRight(); - // Extending the column of aligned mem chunk - if ((alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) - && !increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) { - memTableIncrement += - ((currentChunkPointNum + addingPointNum) / PrimitiveArrayManager.ARRAY_SIZE + 1) - * AlignedTVList.valueListArrayMemCost(dataTypes[i]); - increasingMemTableInfo.get(deviceId).left.put(measurements[i], dataTypes[i]); - } - // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { - textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); - } - } - int addingPointNum = increasingMemTableInfo.get(deviceId).getRight(); - // Here currentChunkPointNum + addingPointNum >= 1 - if (((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0) { - if (alignedMemChunk != null) { - dataTypesInTVList.addAll( - ((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes()); - } - dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values()); - memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList); - } - increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1); - } - } - updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); - return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; - } - private long[] checkMemCostAndAddToTspInfoForTablet( IDeviceID deviceId, String[] measurements, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index 50b2a5d3ea6..6c263c26e57 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -613,6 +613,65 @@ public class TsFileProcessorTest { Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); } + @Test + public void testRamCostInsertSameDataBy2Ways() + throws MetadataException, WriteProcessException, IOException { + TsFileProcessor processor1 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo); + processor1.setTsFileProcessorInfo(tsFileProcessorInfo1); + this.sgInfo.initTsFileProcessorInfo(processor1); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1); + // insert 100 rows (50 aligned, 50 non-aligned) by insertRow + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + if (i <= 50) { + node.setAligned(true); + } + processor1.insert(node, new long[4]); + } + IMemTable memTable1 = processor1.getWorkMemTable(); + + TsFileProcessor processor2 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo); + processor2.setTsFileProcessorInfo(tsFileProcessorInfo2); + this.sgInfo.initTsFileProcessorInfo(processor2); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2); + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + insertRowsNode.setAligned(true); + // insert 100 rows (50 aligned, 50 non-aligned) by insertRows + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, i <= 50 ? deviceId : "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + if (i <= 50) { + node.setAligned(true); + } + insertRowsNode.addOneInsertRowNode(node, i - 1); + } + processor2.insert(insertRowsNode, new long[4]); + IMemTable memTable2 = processor2.getWorkMemTable(); + + Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); + Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); + Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); + } + @Test public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin..");
