This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_insertRows_mem_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bf091679585f4b519f73adb514542e21b3b46cc7 Author: HTHou <[email protected]> AuthorDate: Fri May 10 00:40:42 2024 +0800 init --- .../dataregion/memtable/TsFileProcessor.java | 157 ++++++++++++++++++--- 1 file changed, 136 insertions(+), 21 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 8cbca98b754..f6fc4a2e4a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; +import java.util.HashSet; +import java.util.Objects; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.MetadataException; @@ -253,12 +255,12 @@ public class TsFileProcessor { long memControlStartTime = System.nanoTime(); if (insertRowNode.isAligned()) { memIncrements = - checkAlignedMemCostAndAddToTspInfo( + checkAlignedMemCostAndAddToTspInfoForRow( insertRowNode.getDeviceID(), insertRowNode.getMeasurements(), insertRowNode.getDataTypes(), insertRowNode.getValues()); } else { memIncrements = - checkMemCostAndAddToTspInfo( + checkMemCostAndAddToTspInfoForRow( insertRowNode.getDeviceID(), insertRowNode.getMeasurements(), insertRowNode.getDataTypes(), insertRowNode.getValues()); } @@ -328,23 +330,13 @@ public class TsFileProcessor { .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); } - long[] memIncrements = null; + long[] memIncrements; long memControlStartTime = System.nanoTime(); if (insertRowsNode.isAligned()) { - for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { - memIncrements = - checkAlignedMemCostAndAddToTspInfo( - insertRowNode.getDeviceID(), insertRowNode.getMeasurements(), - insertRowNode.getDataTypes(), insertRowNode.getValues()); - } + memIncrements = checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode); } else { - for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { - memIncrements = - checkMemCostAndAddToTspInfo( - insertRowNode.getDeviceID(), insertRowNode.getMeasurements(), - insertRowNode.getDataTypes(), insertRowNode.getValues()); - } + memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode); } // recordScheduleMemoryBlockCost costsForMetrics[1] += System.nanoTime() - memControlStartTime; @@ -437,7 +429,7 @@ public class TsFileProcessor { long startTime = System.nanoTime(); if (insertTabletNode.isAligned()) { memIncrements = - checkAlignedMemCostAndAddToTsp( + checkAlignedMemCostAndAddToTspForTablet( insertTabletNode.getDeviceID(), insertTabletNode.getMeasurements(), insertTabletNode.getDataTypes(), @@ -446,7 +438,7 @@ public class TsFileProcessor { end); } else { memIncrements = - checkMemCostAndAddToTspInfo( + checkMemCostAndAddToTspInfoForTablet( insertTabletNode.getDeviceID(), insertTabletNode.getMeasurements(), insertTabletNode.getDataTypes(), @@ -523,7 +515,7 @@ public class TsFileProcessor { } @SuppressWarnings("squid:S3776") // High Cognitive Complexity - private long[] checkMemCostAndAddToTspInfo( + private long[] checkMemCostAndAddToTspInfoForRow( IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values) throws WriteProcessException { // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 @@ -557,8 +549,61 @@ public class TsFileProcessor { return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; } + @SuppressWarnings("squid:S3776") // High Cognitive Complexity + private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode) + throws WriteProcessException { + // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 + long memTableIncrement = 0L; + long textDataIncrement = 0L; + long chunkMetadataIncrement = 0L; + // device -> measurement -> adding TVList size + Map<IDeviceID, Map<String, Integer>> increasingMemTableMemInfo = new HashMap<>(); + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + IDeviceID deviceId = insertRowNode.getDeviceID(); + TSDataType[] dataTypes = insertRowNode.getDataTypes(); + Object[] values = insertRowNode.getValues(); + String[] measurements = insertRowNode.getMeasurements(); + for (int i = 0; i < dataTypes.length; i++) { + // Skip failed Measurements + if (dataTypes[i] == null || measurements[i] == null) { + continue; + } + if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i]) + || !increasingMemTableMemInfo.containsKey(deviceId) + || !increasingMemTableMemInfo.get(deviceId).containsKey(measurements[i])) { + // ChunkMetadataIncrement + chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]); + memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]); + increasingMemTableMemInfo + .computeIfAbsent(deviceId, k -> new HashMap<>()) + .putIfAbsent(measurements[i], 1); + } else { + // here currentChunkPointNum >= 1 + long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceId, measurements[i]); + int addingPointNum = + increasingMemTableMemInfo + .computeIfAbsent(deviceId, k -> new HashMap<>()) + .computeIfAbsent(measurements[i], k -> 0); + memTableIncrement += + ((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0 + ? TVList.tvListArrayMemCost(dataTypes[i]) + : 0; + increasingMemTableMemInfo + .get(deviceId) + .computeIfPresent(measurements[i], (k, v) -> v + 1); + } + // TEXT data mem size + if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { + textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); + } + } + } + updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); + return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; + } + @SuppressWarnings("squid:S3776") // high Cognitive Complexity - private long[] checkAlignedMemCostAndAddToTspInfo( + private long[] checkAlignedMemCostAndAddToTspInfoForRow( IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values) throws WriteProcessException { // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 @@ -617,7 +662,77 @@ public class TsFileProcessor { return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; } - private long[] checkMemCostAndAddToTspInfo( + @SuppressWarnings("squid:S3776") // high Cognitive Complexity + private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode insertRowsNode) + throws WriteProcessException { + // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 + long memTableIncrement = 0L; + long textDataIncrement = 0L; + long chunkMetadataIncrement = 0L; + // device -> (measurements, adding aligned TVList size) + Map<IDeviceID, Pair<Set<String>, Integer>> increasingMemTableMemInfo = new HashMap<>(); + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + IDeviceID deviceId = insertRowNode.getDeviceID(); + TSDataType[] dataTypes = insertRowNode.getDataTypes(); + Object[] values = insertRowNode.getValues(); + String[] measurements = insertRowNode.getMeasurements(); + if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER) + || !increasingMemTableMemInfo.containsKey(deviceId)) { + // For new device of this mem table + // ChunkMetadataIncrement + chunkMetadataIncrement += + ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) + * dataTypes.length; + memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes); + for (int i = 0; i < dataTypes.length; i++) { + // Skip failed Measurements + if (dataTypes[i] == null || measurements[i] == null) { + continue; + } + Objects.requireNonNull( + increasingMemTableMemInfo.putIfAbsent(deviceId, new Pair<>(new HashSet<>(), 1))).left.add(measurements[i]); + // TEXT data mem size + if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { + textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); + } + } + + } else { + // For existed device of this mem table + AlignedWritableMemChunk alignedMemChunk = + ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId)) + .getAlignedMemChunk(); + List<TSDataType> dataTypesInTVList = new ArrayList<>(); + for (int i = 0; i < dataTypes.length; i++) { + // Skip failed Measurements + if (dataTypes[i] == null || measurements[i] == null) { + continue; + } + + // Extending the column of aligned mem chunk + if (!alignedMemChunk.containsMeasurement(measurements[i])) { + memTableIncrement += + (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) + * AlignedTVList.valueListArrayMemCost(dataTypes[i]); + dataTypesInTVList.add(dataTypes[i]); + } + // TEXT data mem size + if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { + textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); + } + } + // Here currentChunkPointNum >= 1 + if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) { + dataTypesInTVList.addAll(((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes()); + memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList); + } + } + } + updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); + return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; + } + + private long[] checkMemCostAndAddToTspInfoForTablet( IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, @@ -644,7 +759,7 @@ public class TsFileProcessor { return memIncrements; } - private long[] checkAlignedMemCostAndAddToTsp( + private long[] checkAlignedMemCostAndAddToTspForTablet( IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes,
