This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_insertRows_mem_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 829725fcb2bfd636ce55ec09495d03fe3fd6f120 Author: HTHou <[email protected]> AuthorDate: Fri May 10 23:06:47 2024 +0800 fix aligned insertRows mem calculate --- .../dataregion/memtable/TsFileProcessor.java | 43 +++++++++++++--------- .../dataregion/memtable/TsFileProcessorTest.java | 4 +- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index ff57086f31d..7e046388c39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -83,11 +83,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; @@ -669,8 +667,9 @@ public class TsFileProcessor { long memTableIncrement = 0L; long textDataIncrement = 0L; long chunkMetadataIncrement = 0L; - // device -> (measurements, adding aligned TVList size) - Map<IDeviceID, Pair<Set<String>, Integer>> increasingMemTableMemInfo = new HashMap<>(); + // device -> (measurements -> datatype, adding aligned TVList size) + Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> increasingMemTableMemInfo = + new HashMap<>(); for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { IDeviceID deviceId = insertRowNode.getDeviceID(); TSDataType[] dataTypes = insertRowNode.getDataTypes(); @@ -689,10 +688,10 @@ public class TsFileProcessor { if (dataTypes[i] == null || measurements[i] == null) { continue; } - Objects.requireNonNull( - increasingMemTableMemInfo.putIfAbsent(deviceId, new Pair<>(new HashSet<>(), 1))) + increasingMemTableMemInfo + .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1)) .left - .add(measurements[i]); + .put(measurements[i], dataTypes[i]); // TEXT data mem size if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); @@ -701,9 +700,11 @@ public class TsFileProcessor { } else { // For existed device of this mem table + AlignedWritableMemChunkGroup memChunkGroup = + (AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId); AlignedWritableMemChunk alignedMemChunk = - ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId)) - .getAlignedMemChunk(); + memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk(); + long currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize(); List<TSDataType> dataTypesInTVList = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements @@ -711,26 +712,34 @@ public class TsFileProcessor { continue; } + Pair<Map<String, TSDataType>, Integer> addingPointNumInfo = + increasingMemTableMemInfo.computeIfAbsent( + deviceId, k -> new Pair<>(new HashMap<>(), 0)); + int addingPointNum = addingPointNumInfo.getRight(); // Extending the column of aligned mem chunk - if (!alignedMemChunk.containsMeasurement(measurements[i]) - || !increasingMemTableMemInfo.get(deviceId).left.contains(measurements[i])) { + if ((alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) + && !increasingMemTableMemInfo.get(deviceId).left.containsKey(measurements[i])) { memTableIncrement += - (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) + ((currentChunkPointNum + addingPointNum) / PrimitiveArrayManager.ARRAY_SIZE + 1) * AlignedTVList.valueListArrayMemCost(dataTypes[i]); - dataTypesInTVList.add(dataTypes[i]); + increasingMemTableMemInfo.get(deviceId).left.put(measurements[i], dataTypes[i]); } // TEXT data mem size if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } + int addingPointNum = increasingMemTableMemInfo.get(deviceId).getRight(); // Here currentChunkPointNum >= 1 - if (((alignedMemChunk.alignedListSize() + increasingMemTableMemInfo.get(deviceId).right) - % PrimitiveArrayManager.ARRAY_SIZE) - == 0) { - dataTypesInTVList.addAll(((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes()); + if (((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0) { + if (alignedMemChunk != null) { + dataTypesInTVList.addAll( + ((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes()); + } + dataTypesInTVList.addAll(increasingMemTableMemInfo.get(deviceId).getLeft().values()); memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList); } + increasingMemTableMemInfo.get(deviceId).setRight(addingPointNum + 1); } } updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index e9354a3cf38..5555b27fd45 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -539,7 +539,9 @@ public class TsFileProcessorTest { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), i - 1); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + node.setAligned(true); + insertRowsNode.addOneInsertRowNode(node, i - 1); } processor2.insert(insertRowsNode, new long[4]); IMemTable memTable2 = processor2.getWorkMemTable();
