This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a389dd647c Fix memory calculate error when insertRecords with both 
aligned and non-aligned devices (#12720)
7a389dd647c is described below

commit 7a389dd647c57a756ceccf5640ba37413303047d
Author: Haonan <[email protected]>
AuthorDate: Sat Sep 14 21:45:22 2024 +0800

    Fix memory calculate error when insertRecords with both aligned and 
non-aligned devices (#12720)
---
 .../planner/plan/node/write/InsertRowsNode.java    |  11 ++
 .../db/storageengine/dataregion/DataRegion.java    |   5 +-
 .../dataregion/memtable/TsFileProcessor.java       |  47 +++++---
 .../dataregion/memtable/TsFileProcessorTest.java   | 124 ++++++++++++++++++++-
 4 files changed, 172 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 5bc87b63925..dee94ff246e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -65,6 +65,8 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   /** The {@link InsertRowNode} list */
   private List<InsertRowNode> insertRowNodeList;
 
+  private boolean isMixingAlignment = false;
+
   public InsertRowsNode(PlanNodeId id) {
     super(id);
     insertRowNodeList = new ArrayList<>();
@@ -102,6 +104,14 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
     insertRowNodeIndexList.add(index);
   }
 
+  public boolean isMixingAlignment() {
+    return isMixingAlignment;
+  }
+
+  public void setMixingAlignment(boolean mixingAlignment) {
+    isMixingAlignment = mixingAlignment;
+  }
+
   @Override
   public void setSearchIndex(long index) {
     searchIndex = index;
@@ -375,5 +385,6 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   public InsertRowsNode emptyClone() {
     return new InsertRowsNode(this.getPlanNodeId());
   }
+
   // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index e47ac78c0b6..5915be05207 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1455,6 +1455,9 @@ public class DataRegion implements IDataRegionForQuery {
                 v.markAsGeneratedByRemoteConsensusLeader();
               }
             }
+            if (v.isAligned() != insertRowNode.isAligned()) {
+              v.setMixingAlignment(true);
+            }
             v.addOneInsertRowNode(insertRowNode, finalI);
             v.updateProgressIndex(insertRowNode.getProgressIndex());
             return v;
@@ -3517,7 +3520,7 @@ public class DataRegion implements IDataRegionForQuery {
           TimePartitionManager.getInstance()
               .registerTimePartitionInfo(
                   new TimePartitionInfo(
-                      new DataRegionId(Integer.valueOf(dataRegionId)),
+                      new DataRegionId(Integer.parseInt(dataRegionId)),
                       timePartitionId,
                       true,
                       Long.MAX_VALUE,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index b4ceea6dfda..7ae1186fe14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -358,10 +358,29 @@ public class TsFileProcessor {
     long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
-    if (insertRowsNode.isAligned()) {
-      memIncrements = 
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
+    if (insertRowsNode.isMixingAlignment()) {
+      List<InsertRowNode> alignedList = new ArrayList<>();
+      List<InsertRowNode> nonAlignedList = new ArrayList<>();
+      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
+        if (insertRowNode.isAligned()) {
+          alignedList.add(insertRowNode);
+        } else {
+          nonAlignedList.add(insertRowNode);
+        }
+      }
+      long[] alignedMemIncrements = 
checkAlignedMemCostAndAddToTspInfoForRows(alignedList);
+      long[] nonAlignedMemIncrements = 
checkMemCostAndAddToTspInfoForRows(nonAlignedList);
+      memIncrements = new long[3];
+      for (int i = 0; i < 3; i++) {
+        memIncrements[i] = alignedMemIncrements[i] + 
nonAlignedMemIncrements[i];
+      }
     } else {
-      memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
+      if (insertRowsNode.isAligned()) {
+        memIncrements =
+            
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList());
+      } else {
+        memIncrements = 
checkMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList());
+      }
     }
     // recordScheduleMemoryBlockCost
     costsForMetrics[1] += System.nanoTime() - memControlStartTime;
@@ -627,7 +646,7 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // High Cognitive Complexity
-  private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+  private long[] checkMemCostAndAddToTspInfoForRows(List<InsertRowNode> 
insertRowNodeList)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
@@ -635,7 +654,7 @@ public class TsFileProcessor {
     long chunkMetadataIncrement = 0L;
     // device -> measurement -> adding TVList size
     Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
-    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
       IDeviceID deviceId = insertRowNode.getDeviceID();
       TSDataType[] dataTypes = insertRowNode.getDataTypes();
       Object[] values = insertRowNode.getValues();
@@ -741,7 +760,7 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
-  private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+  private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode> 
insertRowNodeList)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
@@ -749,7 +768,7 @@ public class TsFileProcessor {
     long chunkMetadataIncrement = 0L;
     // device -> (measurements -> datatype, adding aligned TVList size)
     Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfo = new HashMap<>();
-    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
       IDeviceID deviceId = insertRowNode.getDeviceID();
       TSDataType[] dataTypes = insertRowNode.getDataTypes();
       Object[] values = insertRowNode.getValues();
@@ -797,9 +816,11 @@ public class TsFileProcessor {
 
           int addingPointNum = addingPointNumInfo.getRight();
           // Extending the column of aligned mem chunk
-          if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
-              && 
!increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) {
-            increasingMemTableInfo.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
+          boolean currentMemChunkContainsMeasurement =
+              alignedMemChunk != null && 
alignedMemChunk.containsMeasurement(measurements[i]);
+          if (!currentMemChunkContainsMeasurement
+              && !addingPointNumInfo.left.containsKey(measurements[i])) {
+            addingPointNumInfo.left.put(measurements[i], dataTypes[i]);
             int currentArrayNum =
                 (currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE
                     + ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE
@@ -810,17 +831,17 @@ public class TsFileProcessor {
                 currentArrayNum * 
AlignedTVList.valueListArrayMemCost(dataTypes[i]);
           }
         }
-        int addingPointNum = increasingMemTableInfo.get(deviceId).getRight();
+        int addingPointNum = addingPointNumInfo.right;
         // Here currentChunkPointNum + addingPointNum >= 1
         if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
           if (alignedMemChunk != null) {
             dataTypesInTVList.addAll(
                 ((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
           }
-          
dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values());
+          dataTypesInTVList.addAll(addingPointNumInfo.left.values());
           memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
         }
-        increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1);
+        addingPointNumInfo.setRight(addingPointNum + 1);
       }
 
       for (int i = 0; i < dataTypes.length; i++) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index ea4632452b1..59fa1ff764b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -588,11 +588,11 @@ public class TsFileProcessorTest {
 
     // insert more rows by insertRows
     insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
     insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
     insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
     insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
     insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
-    insertRowsNode.setAligned(true);
     processor2.insert(insertRowsNode, new long[4]);
 
     Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
@@ -614,6 +614,128 @@ public class TsFileProcessorTest {
     Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
   }
 
+  @Test
+  public void testRamCostInsertSameDataBy2Ways()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      processor1.insert(node, new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRows
+    insertRowsNode.setMixingAlignment(true);
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      insertRowsNode.addOneInsertRowNode(node, i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
+  @Test
+  public void testRamCostInsertSameDataBy2Ways2()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, "s" + i, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      processor1.insert(node, new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRows
+    insertRowsNode.setMixingAlignment(true);
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, "s" + i, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      insertRowsNode.addOneInsertRowNode(node, i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
   @Test
   public void testWriteAndClose() throws IOException, WriteProcessException, 
MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");

Reply via email to