This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 3ad77ac892c Fix memory calculate error when insertRecords with both 
aligned and non-aligned devices (#12720) (#13527)
3ad77ac892c is described below

commit 3ad77ac892c5daf66b5b98ba9785a91bdbe711f5
Author: Haonan <[email protected]>
AuthorDate: Wed Sep 18 09:30:55 2024 +0800

    Fix memory calculate error when insertRecords with both aligned and 
non-aligned devices (#12720) (#13527)
---
 .../planner/plan/node/write/InsertRowsNode.java    |  10 ++
 .../db/storageengine/dataregion/DataRegion.java    |   3 +
 .../memtable/AlignedWritableMemChunk.java          |   2 +-
 .../dataregion/memtable/TsFileProcessor.java       |  59 +++++++---
 .../dataregion/memtable/TsFileProcessorTest.java   | 124 ++++++++++++++++++++-
 5 files changed, 180 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 2e86672aaa8..f5cd98c8f95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -65,6 +65,8 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   /** The {@link InsertRowNode} list */
   private List<InsertRowNode> insertRowNodeList;
 
+  private boolean isMixingAlignment = false;
+
   public InsertRowsNode(PlanNodeId id) {
     super(id);
     insertRowNodeList = new ArrayList<>();
@@ -102,6 +104,14 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
     insertRowNodeIndexList.add(index);
   }
 
+  public boolean isMixingAlignment() {
+    return isMixingAlignment;
+  }
+
+  public void setMixingAlignment(boolean mixingAlignment) {
+    isMixingAlignment = mixingAlignment;
+  }
+
   @Override
   public void setSearchIndex(long index) {
     searchIndex = index;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 0ecf7f7c81c..90b603d0b06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1344,6 +1344,9 @@ public class DataRegion implements IDataRegionForQuery {
                 v.markAsGeneratedByRemoteConsensusLeader();
               }
             }
+            if (v.isAligned() != insertRowNode.isAligned()) {
+              v.setMixingAlignment(true);
+            }
             v.addOneInsertRowNode(insertRowNode, finalI);
             v.updateProgressIndex(insertRowNode.getProgressIndex());
             return v;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 77b98b8b70e..d5d9bdce4ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -244,7 +244,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     return (long) list.rowCount() * measurementIndexMap.size();
   }
 
-  public long alignedListSize() {
+  public int alignedListSize() {
     return list.rowCount();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index bc69d29d10d..f39cb9f0bb0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -355,10 +355,29 @@ public class TsFileProcessor {
     long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
-    if (insertRowsNode.isAligned()) {
-      memIncrements = 
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
+    if (insertRowsNode.isMixingAlignment()) {
+      List<InsertRowNode> alignedList = new ArrayList<>();
+      List<InsertRowNode> nonAlignedList = new ArrayList<>();
+      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
+        if (insertRowNode.isAligned()) {
+          alignedList.add(insertRowNode);
+        } else {
+          nonAlignedList.add(insertRowNode);
+        }
+      }
+      long[] alignedMemIncrements = 
checkAlignedMemCostAndAddToTspInfoForRows(alignedList);
+      long[] nonAlignedMemIncrements = 
checkMemCostAndAddToTspInfoForRows(nonAlignedList);
+      memIncrements = new long[3];
+      for (int i = 0; i < 3; i++) {
+        memIncrements[i] = alignedMemIncrements[i] + 
nonAlignedMemIncrements[i];
+      }
     } else {
-      memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
+      if (insertRowsNode.isAligned()) {
+        memIncrements =
+            
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList());
+      } else {
+        memIncrements = 
checkMemCostAndAddToTspInfoForRows(insertRowsNode.getInsertRowNodeList());
+      }
     }
     // recordScheduleMemoryBlockCost
     costsForMetrics[1] += System.nanoTime() - memControlStartTime;
@@ -573,7 +592,7 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // High Cognitive Complexity
-  private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+  private long[] checkMemCostAndAddToTspInfoForRows(List<InsertRowNode> 
insertRowNodeList)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
@@ -581,7 +600,7 @@ public class TsFileProcessor {
     long chunkMetadataIncrement = 0L;
     // device -> measurement -> adding TVList size
     Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
-    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
       IDeviceID deviceId = insertRowNode.getDeviceID();
       TSDataType[] dataTypes = insertRowNode.getDataTypes();
       Object[] values = insertRowNode.getValues();
@@ -684,7 +703,7 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
-  private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+  private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode> 
insertRowNodeList)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
@@ -692,7 +711,7 @@ public class TsFileProcessor {
     long chunkMetadataIncrement = 0L;
     // device -> (measurements -> datatype, adding aligned TVList size)
     Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfo = new HashMap<>();
-    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
       IDeviceID deviceId = insertRowNode.getDeviceID();
       TSDataType[] dataTypes = insertRowNode.getDataTypes();
       Object[] values = insertRowNode.getValues();
@@ -726,7 +745,7 @@ public class TsFileProcessor {
             (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
         AlignedWritableMemChunk alignedMemChunk =
             memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
-        long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+        int currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
         List<TSDataType> dataTypesInTVList = new ArrayList<>();
         Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
             increasingMemTableInfo.computeIfAbsent(deviceId, k -> new 
Pair<>(new HashMap<>(), 0));
@@ -738,29 +757,37 @@ public class TsFileProcessor {
 
           int addingPointNum = addingPointNumInfo.getRight();
           // Extending the column of aligned mem chunk
-          if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
-              && 
!increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) {
+          boolean currentMemChunkContainsMeasurement =
+              alignedMemChunk != null && 
alignedMemChunk.containsMeasurement(measurements[i]);
+          if (!currentMemChunkContainsMeasurement
+              && !addingPointNumInfo.left.containsKey(measurements[i])) {
+            addingPointNumInfo.left.put(measurements[i], dataTypes[i]);
+            int currentArrayNum =
+                (currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE
+                    + ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE
+                            > 0
+                        ? 1
+                        : 0);
             memTableIncrement +=
-                ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
-                    * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
-            increasingMemTableInfo.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
+                currentArrayNum * 
AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+            addingPointNumInfo.left.put(measurements[i], dataTypes[i]);
           }
           // TEXT data mem size
           if (dataTypes[i].isBinary() && values[i] != null) {
             textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
           }
         }
-        int addingPointNum = increasingMemTableInfo.get(deviceId).getRight();
+        int addingPointNum = addingPointNumInfo.right;
         // Here currentChunkPointNum + addingPointNum >= 1
         if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
           if (alignedMemChunk != null) {
             dataTypesInTVList.addAll(
                 ((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
           }
-          
dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values());
+          dataTypesInTVList.addAll(addingPointNumInfo.left.values());
           memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
         }
-        increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1);
+        addingPointNumInfo.setRight(addingPointNum + 1);
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index 50b2a5d3ea6..89266fa632f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -587,11 +587,11 @@ public class TsFileProcessorTest {
 
     // insert more rows by insertRows
     insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
     insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
     insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
     insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
     insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
-    insertRowsNode.setAligned(true);
     processor2.insert(insertRowsNode, new long[4]);
 
     Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
@@ -613,6 +613,128 @@ public class TsFileProcessorTest {
     Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
   }
 
+  @Test
+  public void testRamCostInsertSameDataBy2Ways()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      processor1.insert(node, new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRows
+    insertRowsNode.setMixingAlignment(true);
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      insertRowsNode.addOneInsertRowNode(node, i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
+  @Test
+  public void testRamCostInsertSameDataBy2Ways2()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, "s" + i, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      processor1.insert(node, new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
+    // insert 100 rows (50 aligned, 50 non-aligned) by insertRows
+    insertRowsNode.setMixingAlignment(true);
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, i <= 50 ? deviceId : 
"root.vehicle.d2");
+      record.addTuple(DataPoint.getDataPoint(dataType, "s" + i, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      if (i <= 50) {
+        node.setAligned(true);
+      }
+      insertRowsNode.addOneInsertRowNode(node, i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
   @Test
   public void testWriteAndClose() throws IOException, WriteProcessException, 
MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");

Reply via email to