This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_mem_control_insertRows3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 459db93fa925af74bc189bdf59729be02406d988
Author: HTHou <[email protected]>
AuthorDate: Wed Jun 12 16:04:55 2024 +0800

    Fix memory calculate error when insertRecords with both aligned and 
non-aligned devices
---
 .../db/storageengine/dataregion/DataRegion.java    |   2 -
 .../dataregion/memtable/TsFileProcessor.java       | 264 +++++++++++----------
 .../dataregion/memtable/TsFileProcessorTest.java   |  59 +++++
 3 files changed, 201 insertions(+), 124 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 5a58b3e1dbf..42a94f1bce3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1198,7 +1198,6 @@ public class DataRegion implements IDataRegionForQuery {
             if (v == null) {
               v = new InsertRowsNode(insertRowsNode.getPlanNodeId());
               v.setSearchIndex(insertRowNode.getSearchIndex());
-              v.setAligned(insertRowNode.isAligned());
             }
             v.addOneInsertRowNode(insertRowNode, finalI);
             return v;
@@ -3278,7 +3277,6 @@ public class DataRegion implements IDataRegionForQuery {
               if (v == null) {
                 v = new 
InsertRowsNode(insertRowsOfOneDeviceNode.getPlanNodeId());
                 v.setSearchIndex(insertRowNode.getSearchIndex());
-                v.setAligned(insertRowNode.isAligned());
               }
               v.addOneInsertRowNode(insertRowNode, finalI);
               return v;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 74aed59ab88..86c479f1737 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -345,11 +345,7 @@ public class TsFileProcessor {
     long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
-    if (insertRowsNode.isAligned()) {
-      memIncrements = 
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
-    } else {
-      memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
-    }
+    memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
     // recordScheduleMemoryBlockCost
     costsForMetrics[1] += System.nanoTime() - memControlStartTime;
 
@@ -561,55 +557,163 @@ public class TsFileProcessor {
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+    return memIncrements;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleAlignedData(
+      InsertRowNode insertRowNode,
+      long[] memIncrements,
+      Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned) {
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    IDeviceID deviceId = insertRowNode.getDeviceID();
+    TSDataType[] dataTypes = insertRowNode.getDataTypes();
+    Object[] values = insertRowNode.getValues();
+    String[] measurements = insertRowNode.getMeasurements();
+
+    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+        && !increasingMemTableInfoForAligned.containsKey(deviceId)) {
+      // For new device of this mem table
+      // ChunkMetadataIncrement
+      chunkMetadataIncrement +=
+          ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+              * dataTypes.length;
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
-            && (!increasingMemTableInfo.containsKey(deviceId)
-                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
-          // ChunkMetadataIncrement
-          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
-          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
-          increasingMemTableInfo
-              .computeIfAbsent(deviceId, k -> new HashMap<>())
-              .putIfAbsent(measurements[i], 1);
-        } else {
-          // here currentChunkPointNum >= 1
-          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
-          int addingPointNum =
-              increasingMemTableInfo
-                  .computeIfAbsent(deviceId, k -> new HashMap<>())
-                  .computeIfAbsent(measurements[i], k -> 0);
+        increasingMemTableInfoForAligned
+            .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+            .left
+            .put(measurements[i], dataTypes[i]);
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+
+    } else {
+      // For existed device of this mem table
+      AlignedWritableMemChunkGroup memChunkGroup =
+          (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
+      AlignedWritableMemChunk alignedMemChunk =
+          memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+      long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+      List<TSDataType> dataTypesInTVList = new ArrayList<>();
+      Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+          increasingMemTableInfoForAligned.computeIfAbsent(
+              deviceId, k -> new Pair<>(new HashMap<>(), 0));
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+
+        int addingPointNum = addingPointNumInfo.getRight();
+        // Extending the column of aligned mem chunk
+        if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+            && 
!increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i]))
 {
           memTableIncrement +=
-              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
-                  ? TVList.tvListArrayMemCost(dataTypes[i])
-                  : 0;
-          
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) 
-> v + 1);
+              ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+          
increasingMemTableInfoForAligned.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
         }
         // TEXT data mem size
-        if (dataTypes[i].isBinary() && values[i] != null) {
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
           textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
         }
       }
+      int addingPointNum = 
increasingMemTableInfoForAligned.get(deviceId).getRight();
+      // Here currentChunkPointNum + addingPointNum >= 1
+      if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
+        if (alignedMemChunk != null) {
+          dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
+        }
+        
dataTypesInTVList.addAll(increasingMemTableInfoForAligned.get(deviceId).getLeft().values());
+        memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
+      }
+      increasingMemTableInfoForAligned.get(deviceId).setRight(addingPointNum + 
1);
     }
-    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
-    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+
+    memIncrements[0] = memTableIncrement;
+    memIncrements[1] = textDataIncrement;
+    memIncrements[2] = chunkMetadataIncrement;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleUnalignedData(
+      InsertRowNode insertRowNode,
+      long[] memIncrements,
+      Map<IDeviceID, Map<String, Integer>> 
increasingMemTableInfoForNonAligned) {
+
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    IDeviceID deviceId = insertRowNode.getDeviceID();
+    TSDataType[] dataTypes = insertRowNode.getDataTypes();
+    Object[] values = insertRowNode.getValues();
+    String[] measurements = insertRowNode.getMeasurements();
+
+    for (int i = 0; i < dataTypes.length; i++) {
+      // Skip failed Measurements
+      if (dataTypes[i] == null || measurements[i] == null) {
+        continue;
+      }
+      if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
+          && (!increasingMemTableInfoForNonAligned.containsKey(deviceId)
+              || 
!increasingMemTableInfoForNonAligned.get(deviceId).containsKey(measurements[i])))
 {
+        // ChunkMetadataIncrement
+        chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
+        memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
+        increasingMemTableInfoForNonAligned
+            .computeIfAbsent(deviceId, k -> new HashMap<>())
+            .putIfAbsent(measurements[i], 1);
+      } else {
+        // here currentChunkPointNum >= 1
+        long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
+        int addingPointNum =
+            increasingMemTableInfoForNonAligned
+                .computeIfAbsent(deviceId, k -> new HashMap<>())
+                .computeIfAbsent(measurements[i], k -> 0);
+        memTableIncrement +=
+            ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
+                ? TVList.tvListArrayMemCost(dataTypes[i])
+                : 0;
+        increasingMemTableInfoForNonAligned
+            .get(deviceId)
+            .computeIfPresent(measurements[i], (k, v) -> v + 1);
+      }
+      // TEXT data mem size
+      if (dataTypes[i].isBinary() && values[i] != null) {
+        textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+      }
+    }
+    memIncrements[0] = memTableIncrement;
+    memIncrements[1] = textDataIncrement;
+    memIncrements[2] = chunkMetadataIncrement;
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
@@ -672,90 +776,6 @@ public class TsFileProcessor {
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
-  private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
-      throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
-    // device -> (measurements -> datatype, adding aligned TVList size)
-    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfo = new HashMap<>();
-    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
-      if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
-          && !increasingMemTableInfo.containsKey(deviceId)) {
-        // For new device of this mem table
-        // ChunkMetadataIncrement
-        chunkMetadataIncrement +=
-            ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
-                * dataTypes.length;
-        memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypes);
-        for (int i = 0; i < dataTypes.length; i++) {
-          // Skip failed Measurements
-          if (dataTypes[i] == null || measurements[i] == null) {
-            continue;
-          }
-          increasingMemTableInfo
-              .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
-              .left
-              .put(measurements[i], dataTypes[i]);
-          // TEXT data mem size
-          if (dataTypes[i].isBinary() && values[i] != null) {
-            textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
-          }
-        }
-
-      } else {
-        // For existed device of this mem table
-        AlignedWritableMemChunkGroup memChunkGroup =
-            (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
-        AlignedWritableMemChunk alignedMemChunk =
-            memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
-        long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
-        List<TSDataType> dataTypesInTVList = new ArrayList<>();
-        Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
-            increasingMemTableInfo.computeIfAbsent(deviceId, k -> new 
Pair<>(new HashMap<>(), 0));
-        for (int i = 0; i < dataTypes.length; i++) {
-          // Skip failed Measurements
-          if (dataTypes[i] == null || measurements[i] == null) {
-            continue;
-          }
-
-          int addingPointNum = addingPointNumInfo.getRight();
-          // Extending the column of aligned mem chunk
-          if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
-              && 
!increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) {
-            memTableIncrement +=
-                ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
-                    * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
-            increasingMemTableInfo.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
-          }
-          // TEXT data mem size
-          if (dataTypes[i].isBinary() && values[i] != null) {
-            textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
-          }
-        }
-        int addingPointNum = increasingMemTableInfo.get(deviceId).getRight();
-        // Here currentChunkPointNum + addingPointNum >= 1
-        if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
-          if (alignedMemChunk != null) {
-            dataTypesInTVList.addAll(
-                ((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
-          }
-          
dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values());
-          memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
-        }
-        increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1);
-      }
-    }
-    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
-    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
-  }
-
   private long[] checkMemCostAndAddToTspInfoForTablet(
       IDeviceID deviceId,
       String[] measurements,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index 50b2a5d3ea6..6c263c26e57 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -613,6 +613,65 @@ public class TsFileProcessorTest {
     Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
   }
 
+  @Test
+  public void testRamCostInsertSameDataBy2Ways()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      processor1.insert(node, new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRows
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      insertRowsNode.addOneInsertRowNode(node, i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
   @Test
   public void testWriteAndClose() throws IOException, WriteProcessException, 
MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");

Reply via email to