This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_insertRows_mem_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5ee9524535e637c480cfa74cc2655438338a6c61 Author: HTHou <[email protected]> AuthorDate: Fri May 10 19:09:27 2024 +0800 finish non aligned --- .../dataregion/memtable/AbstractMemTable.java | 3 + .../dataregion/memtable/TsFileProcessor.java | 20 +-- .../dataregion/memtable/WritableMemChunkGroup.java | 3 + .../dataregion/memtable/TsFileProcessorTest.java | 171 +++++++++++++++++++++ 4 files changed, 188 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 7c7e46ec0ce..2182cec0007 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -399,6 +399,9 @@ public abstract class AbstractMemTable implements IMemTable { @Override public long getCurrentTVListSize(IDeviceID deviceId, String measurement) { IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId); + if (null == memChunkGroup) { + return 0; + } return memChunkGroup.getCurrentTVListSize(measurement); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index c3d766256f5..ff57086f31d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; -import java.util.HashSet; -import java.util.Objects; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.MetadataException; @@ -85,9 +83,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; @@ -569,8 +569,8 @@ public class TsFileProcessor { continue; } if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i]) - || !increasingMemTableMemInfo.containsKey(deviceId) - || !increasingMemTableMemInfo.get(deviceId).containsKey(measurements[i])) { + && (!increasingMemTableMemInfo.containsKey(deviceId) + || !increasingMemTableMemInfo.get(deviceId).containsKey(measurements[i]))) { // ChunkMetadataIncrement chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]); memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]); @@ -677,7 +677,7 @@ public class TsFileProcessor { Object[] values = insertRowNode.getValues(); String[] measurements = insertRowNode.getMeasurements(); if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER) - || !increasingMemTableMemInfo.containsKey(deviceId)) { + && !increasingMemTableMemInfo.containsKey(deviceId)) { // For new device of this mem table // ChunkMetadataIncrement chunkMetadataIncrement += @@ -690,7 +690,9 @@ public class TsFileProcessor { continue; } Objects.requireNonNull( - increasingMemTableMemInfo.putIfAbsent(deviceId, new Pair<>(new HashSet<>(), 1))).left.add(measurements[i]); + increasingMemTableMemInfo.putIfAbsent(deviceId, new Pair<>(new HashSet<>(), 1))) + .left + .add(measurements[i]); // TEXT data mem size if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); @@ -723,9 +725,9 @@ public class TsFileProcessor { } } // Here currentChunkPointNum >= 1 - if (((alignedMemChunk.alignedListSize() - + increasingMemTableMemInfo.get(deviceId).right) - % PrimitiveArrayManager.ARRAY_SIZE) == 0) { + if (((alignedMemChunk.alignedListSize() + increasingMemTableMemInfo.get(deviceId).right) + % PrimitiveArrayManager.ARRAY_SIZE) + == 0) { dataTypesInTVList.addAll(((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes()); memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java index 3b7133d48b2..5d61fe0e3dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java @@ -161,6 +161,9 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup { @Override public long getCurrentTVListSize(String measurement) { + if (!memChunkMap.containsKey(measurement)) { + return 0; + } return memChunkMap.get(measurement).getTVList().rowCount(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index de25e822d58..e9354a3cf38 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -29,6 +29,9 @@ import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo; import org.apache.iotdb.db.storageengine.dataregion.DataRegionTest; @@ -413,6 +416,174 @@ public class TsFileProcessorTest { Assert.assertEquals(1441200, memTable.memSize()); } + @Test + public void testRamCostInsertSameNonAlignedDataBy2Ways() + throws MetadataException, WriteProcessException, IOException { + TsFileProcessor processor1 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo); + processor1.setTsFileProcessorInfo(tsFileProcessorInfo1); + this.sgInfo.initTsFileProcessorInfo(processor1); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1); + // insert 100 rows by insertRow + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + processor1.insert(buildInsertRowNodeByTSRecord(record), new long[4]); + } + IMemTable memTable1 = processor1.getWorkMemTable(); + + TsFileProcessor processor2 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo); + processor2.setTsFileProcessorInfo(tsFileProcessorInfo2); + this.sgInfo.initTsFileProcessorInfo(processor2); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2); + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + // insert 100 rows by insertRows + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), i - 1); + } + processor2.insert(insertRowsNode, new long[4]); + IMemTable memTable2 = processor2.getWorkMemTable(); + + Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); + Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); + Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); + + // insert more rows by insertRow + TSRecord record = new TSRecord(101, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); + InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record); + processor1.insert(insertRowNode1, new long[4]); + record = new TSRecord(101, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); + InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record); + processor1.insert(insertRowNode2, new long[4]); + record = new TSRecord(102, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); + InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record); + processor1.insert(insertRowNode3, new long[4]); + record = new TSRecord(102, "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); + InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record); + processor1.insert(insertRowNode4, new long[4]); + + // insert more rows by insertRows + insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + insertRowsNode.addOneInsertRowNode(insertRowNode1, 0); + insertRowsNode.addOneInsertRowNode(insertRowNode2, 1); + insertRowsNode.addOneInsertRowNode(insertRowNode3, 2); + insertRowsNode.addOneInsertRowNode(insertRowNode4, 3); + processor2.insert(insertRowsNode, new long[4]); + + Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); + Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); + Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); + } + + @Test + public void testRamCostInsertSameAlignedDataBy2Ways() + throws MetadataException, WriteProcessException, IOException { + TsFileProcessor processor1 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo); + processor1.setTsFileProcessorInfo(tsFileProcessorInfo1); + this.sgInfo.initTsFileProcessorInfo(processor1); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1); + // insert 100 rows by insertRow + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + InsertRowNode node = buildInsertRowNodeByTSRecord(record); + node.setAligned(true); + processor1.insert(node, new long[4]); + } + IMemTable memTable1 = processor1.getWorkMemTable(); + + TsFileProcessor processor2 = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo); + processor2.setTsFileProcessorInfo(tsFileProcessorInfo2); + this.sgInfo.initTsFileProcessorInfo(processor2); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2); + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + insertRowsNode.setAligned(true); + // insert 100 rows by insertRows + for (int i = 1; i <= 100; i++) { + TSRecord record = new TSRecord(i, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); + insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), i - 1); + } + processor2.insert(insertRowsNode, new long[4]); + IMemTable memTable2 = processor2.getWorkMemTable(); + + Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); + Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); + Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); + + // insert more rows by insertRow + TSRecord record = new TSRecord(101, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); + InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record); + insertRowNode1.setAligned(true); + processor1.insert(insertRowNode1, new long[4]); + record = new TSRecord(101, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); + InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record); + insertRowNode2.setAligned(true); + processor1.insert(insertRowNode2, new long[4]); + record = new TSRecord(102, deviceId); + record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1")); + InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record); + insertRowNode3.setAligned(true); + processor1.insert(insertRowNode3, new long[4]); + record = new TSRecord(102, "root.vehicle.d2"); + record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1")); + InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record); + insertRowNode4.setAligned(true); + processor1.insert(insertRowNode4, new long[4]); + + // insert more rows by insertRows + insertRowsNode = new InsertRowsNode(new PlanNodeId("")); + insertRowsNode.addOneInsertRowNode(insertRowNode1, 0); + insertRowsNode.addOneInsertRowNode(insertRowNode2, 1); + insertRowsNode.addOneInsertRowNode(insertRowNode3, 2); + insertRowsNode.addOneInsertRowNode(insertRowNode4, 3); + insertRowsNode.setAligned(true); + processor2.insert(insertRowsNode, new long[4]); + + Assert.assertEquals(memTable1.getTVListsRamCost(), memTable2.getTVListsRamCost()); + Assert.assertEquals(memTable1.getTotalPointsNum(), memTable2.getTotalPointsNum()); + Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); + } + @Test public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin..");
