This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_insertRows_mem_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5ee9524535e637c480cfa74cc2655438338a6c61
Author: HTHou <[email protected]>
AuthorDate: Fri May 10 19:09:27 2024 +0800

    finish non aligned
---
 .../dataregion/memtable/AbstractMemTable.java      |   3 +
 .../dataregion/memtable/TsFileProcessor.java       |  20 +--
 .../dataregion/memtable/WritableMemChunkGroup.java |   3 +
 .../dataregion/memtable/TsFileProcessorTest.java   | 171 +++++++++++++++++++++
 4 files changed, 188 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 7c7e46ec0ce..2182cec0007 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -399,6 +399,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
   @Override
   public long getCurrentTVListSize(IDeviceID deviceId, String measurement) {
     IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+    if (null == memChunkGroup) {
+      return 0;
+    }
     return memChunkGroup.getCurrentTVListSize(measurement);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index c3d766256f5..ff57086f31d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
-import java.util.HashSet;
-import java.util.Objects;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -85,9 +83,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -569,8 +569,8 @@ public class TsFileProcessor {
           continue;
         }
         if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
-            || !increasingMemTableMemInfo.containsKey(deviceId)
-            || 
!increasingMemTableMemInfo.get(deviceId).containsKey(measurements[i])) {
+            && (!increasingMemTableMemInfo.containsKey(deviceId)
+                || 
!increasingMemTableMemInfo.get(deviceId).containsKey(measurements[i]))) {
           // ChunkMetadataIncrement
           chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
           memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
@@ -677,7 +677,7 @@ public class TsFileProcessor {
       Object[] values = insertRowNode.getValues();
       String[] measurements = insertRowNode.getMeasurements();
       if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
-          || !increasingMemTableMemInfo.containsKey(deviceId)) {
+          && !increasingMemTableMemInfo.containsKey(deviceId)) {
         // For new device of this mem table
         // ChunkMetadataIncrement
         chunkMetadataIncrement +=
@@ -690,7 +690,9 @@ public class TsFileProcessor {
             continue;
           }
           Objects.requireNonNull(
-              increasingMemTableMemInfo.putIfAbsent(deviceId, new Pair<>(new 
HashSet<>(), 1))).left.add(measurements[i]);
+                  increasingMemTableMemInfo.putIfAbsent(deviceId, new 
Pair<>(new HashSet<>(), 1)))
+              .left
+              .add(measurements[i]);
           // TEXT data mem size
           if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
             textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
@@ -723,9 +725,9 @@ public class TsFileProcessor {
           }
         }
         // Here currentChunkPointNum >= 1
-        if (((alignedMemChunk.alignedListSize()
-            + increasingMemTableMemInfo.get(deviceId).right)
-            % PrimitiveArrayManager.ARRAY_SIZE) == 0) {
+        if (((alignedMemChunk.alignedListSize() + 
increasingMemTableMemInfo.get(deviceId).right)
+                % PrimitiveArrayManager.ARRAY_SIZE)
+            == 0) {
           dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
           memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 3b7133d48b2..5d61fe0e3dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -161,6 +161,9 @@ public class WritableMemChunkGroup implements 
IWritableMemChunkGroup {
 
   @Override
   public long getCurrentTVListSize(String measurement) {
+    if (!memChunkMap.containsKey(measurement)) {
+      return 0;
+    }
     return memChunkMap.get(measurement).getTVList().rowCount();
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index de25e822d58..e9354a3cf38 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -29,6 +29,9 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionTest;
@@ -413,6 +416,174 @@ public class TsFileProcessorTest {
     Assert.assertEquals(1441200, memTable.memSize());
   }
 
+  @Test
+  public void testRamCostInsertSameNonAlignedDataBy2Ways()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      processor1.insert(buildInsertRowNodeByTSRecord(record), new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    // insert 100 rows by insertRows
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), 
i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+    // insert more rows by insertRow
+    TSRecord record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode1, new long[4]);
+    record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode2, new long[4]);
+    record = new TSRecord(102, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode3, new long[4]);
+    record = new TSRecord(102, "root.vehicle.d2");
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode4, new long[4]);
+
+    // insert more rows by insertRows
+    insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+    insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
+    insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
+    insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
+    processor2.insert(insertRowsNode, new long[4]);
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
+  @Test
+  public void testRamCostInsertSameAlignedDataBy2Ways()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      processor1.insert(node, new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
+    // insert 100 rows by insertRows
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), 
i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+    // insert more rows by insertRow
+    TSRecord record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode1.setAligned(true);
+    processor1.insert(insertRowNode1, new long[4]);
+    record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode2.setAligned(true);
+    processor1.insert(insertRowNode2, new long[4]);
+    record = new TSRecord(102, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode3.setAligned(true);
+    processor1.insert(insertRowNode3, new long[4]);
+    record = new TSRecord(102, "root.vehicle.d2");
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode4.setAligned(true);
+    processor1.insert(insertRowNode4, new long[4]);
+
+    // insert more rows by insertRows
+    insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+    insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
+    insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
+    insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
+    insertRowsNode.setAligned(true);
+    processor2.insert(insertRowsNode, new long[4]);
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
   @Test
   public void testWriteAndClose() throws IOException, WriteProcessException, 
MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");

Reply via email to