This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_insertRows_mem_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 829725fcb2bfd636ce55ec09495d03fe3fd6f120
Author: HTHou <[email protected]>
AuthorDate: Fri May 10 23:06:47 2024 +0800

    fix aligned insertRows mem calculate
---
 .../dataregion/memtable/TsFileProcessor.java       | 43 +++++++++++++---------
 .../dataregion/memtable/TsFileProcessorTest.java   |  4 +-
 2 files changed, 29 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index ff57086f31d..7e046388c39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -83,11 +83,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -669,8 +667,9 @@ public class TsFileProcessor {
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
-    // device -> (measurements, adding aligned TVList size)
-    Map<IDeviceID, Pair<Set<String>, Integer>> increasingMemTableMemInfo = new 
HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableMemInfo =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
       IDeviceID deviceId = insertRowNode.getDeviceID();
       TSDataType[] dataTypes = insertRowNode.getDataTypes();
@@ -689,10 +688,10 @@ public class TsFileProcessor {
           if (dataTypes[i] == null || measurements[i] == null) {
             continue;
           }
-          Objects.requireNonNull(
-                  increasingMemTableMemInfo.putIfAbsent(deviceId, new 
Pair<>(new HashSet<>(), 1)))
+          increasingMemTableMemInfo
+              .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
               .left
-              .add(measurements[i]);
+              .put(measurements[i], dataTypes[i]);
           // TEXT data mem size
           if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
             textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
@@ -701,9 +700,11 @@ public class TsFileProcessor {
 
       } else {
         // For existed device of this mem table
+        AlignedWritableMemChunkGroup memChunkGroup =
+            (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
         AlignedWritableMemChunk alignedMemChunk =
-            ((AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId))
-                .getAlignedMemChunk();
+            memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+        long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
         List<TSDataType> dataTypesInTVList = new ArrayList<>();
         for (int i = 0; i < dataTypes.length; i++) {
           // Skip failed Measurements
@@ -711,26 +712,34 @@ public class TsFileProcessor {
             continue;
           }
 
+          Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+              increasingMemTableMemInfo.computeIfAbsent(
+                  deviceId, k -> new Pair<>(new HashMap<>(), 0));
+          int addingPointNum = addingPointNumInfo.getRight();
           // Extending the column of aligned mem chunk
-          if (!alignedMemChunk.containsMeasurement(measurements[i])
-              || 
!increasingMemTableMemInfo.get(deviceId).left.contains(measurements[i])) {
+          if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+              && 
!increasingMemTableMemInfo.get(deviceId).left.containsKey(measurements[i])) {
             memTableIncrement +=
-                (alignedMemChunk.alignedListSize() / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
                     * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
-            dataTypesInTVList.add(dataTypes[i]);
+            increasingMemTableMemInfo.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
           }
           // TEXT data mem size
           if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
             textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
           }
         }
+        int addingPointNum = 
increasingMemTableMemInfo.get(deviceId).getRight();
         // Here currentChunkPointNum >= 1
-        if (((alignedMemChunk.alignedListSize() + 
increasingMemTableMemInfo.get(deviceId).right)
-                % PrimitiveArrayManager.ARRAY_SIZE)
-            == 0) {
-          dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
+        if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
+          if (alignedMemChunk != null) {
+            dataTypesInTVList.addAll(
+                ((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
+          }
+          
dataTypesInTVList.addAll(increasingMemTableMemInfo.get(deviceId).getLeft().values());
           memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
         }
+        increasingMemTableMemInfo.get(deviceId).setRight(addingPointNum + 1);
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index e9354a3cf38..5555b27fd45 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -539,7 +539,9 @@ public class TsFileProcessorTest {
     for (int i = 1; i <= 100; i++) {
       TSRecord record = new TSRecord(i, deviceId);
       record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
-      insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), 
i - 1);
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      insertRowsNode.addOneInsertRowNode(node, i - 1);
     }
     processor2.insert(insertRowsNode, new long[4]);
     IMemTable memTable2 = processor2.getWorkMemTable();

Reply via email to