This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_insertRows_mem_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bf091679585f4b519f73adb514542e21b3b46cc7
Author: HTHou <[email protected]>
AuthorDate: Fri May 10 00:40:42 2024 +0800

    init
---
 .../dataregion/memtable/TsFileProcessor.java       | 157 ++++++++++++++++++---
 1 file changed, 136 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 8cbca98b754..f6fc4a2e4a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import java.util.HashSet;
+import java.util.Objects;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -253,12 +255,12 @@ public class TsFileProcessor {
     long memControlStartTime = System.nanoTime();
     if (insertRowNode.isAligned()) {
       memIncrements =
-          checkAlignedMemCostAndAddToTspInfo(
+          checkAlignedMemCostAndAddToTspInfoForRow(
               insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
               insertRowNode.getDataTypes(), insertRowNode.getValues());
     } else {
       memIncrements =
-          checkMemCostAndAddToTspInfo(
+          checkMemCostAndAddToTspInfoForRow(
               insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
               insertRowNode.getDataTypes(), insertRowNode.getValues());
     }
@@ -328,23 +330,13 @@ public class TsFileProcessor {
           
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
     }
 
-    long[] memIncrements = null;
+    long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
     if (insertRowsNode.isAligned()) {
-      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
-        memIncrements =
-            checkAlignedMemCostAndAddToTspInfo(
-                insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
-                insertRowNode.getDataTypes(), insertRowNode.getValues());
-      }
+      memIncrements = 
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
     } else {
-      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
-        memIncrements =
-            checkMemCostAndAddToTspInfo(
-                insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
-                insertRowNode.getDataTypes(), insertRowNode.getValues());
-      }
+      memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
     }
     // recordScheduleMemoryBlockCost
     costsForMetrics[1] += System.nanoTime() - memControlStartTime;
@@ -437,7 +429,7 @@ public class TsFileProcessor {
       long startTime = System.nanoTime();
       if (insertTabletNode.isAligned()) {
         memIncrements =
-            checkAlignedMemCostAndAddToTsp(
+            checkAlignedMemCostAndAddToTspForTablet(
                 insertTabletNode.getDeviceID(),
                 insertTabletNode.getMeasurements(),
                 insertTabletNode.getDataTypes(),
@@ -446,7 +438,7 @@ public class TsFileProcessor {
                 end);
       } else {
         memIncrements =
-            checkMemCostAndAddToTspInfo(
+            checkMemCostAndAddToTspInfoForTablet(
                 insertTabletNode.getDeviceID(),
                 insertTabletNode.getMeasurements(),
                 insertTabletNode.getDataTypes(),
@@ -523,7 +515,7 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // High Cognitive Complexity
-  private long[] checkMemCostAndAddToTspInfo(
+  private long[] checkMemCostAndAddToTspInfoForRow(
       IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
@@ -557,8 +549,61 @@ public class TsFileProcessor {
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+      throws WriteProcessException {
+    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
+    long memTableIncrement = 0L;
+    long textDataIncrement = 0L;
+    long chunkMetadataIncrement = 0L;
+    // device -> measurement -> adding TVList size
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableMemInfo = new 
HashMap<>();
+    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+      IDeviceID deviceId = insertRowNode.getDeviceID();
+      TSDataType[] dataTypes = insertRowNode.getDataTypes();
+      Object[] values = insertRowNode.getValues();
+      String[] measurements = insertRowNode.getMeasurements();
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
+            || !increasingMemTableMemInfo.containsKey(deviceId)
+            || 
!increasingMemTableMemInfo.get(deviceId).containsKey(measurements[i])) {
+          // ChunkMetadataIncrement
+          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
+          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
+          increasingMemTableMemInfo
+              .computeIfAbsent(deviceId, k -> new HashMap<>())
+              .putIfAbsent(measurements[i], 1);
+        } else {
+          // here currentChunkPointNum >= 1
+          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
+          int addingPointNum =
+              increasingMemTableMemInfo
+                  .computeIfAbsent(deviceId, k -> new HashMap<>())
+                  .computeIfAbsent(measurements[i], k -> 0);
+          memTableIncrement +=
+              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
+                  ? TVList.tvListArrayMemCost(dataTypes[i])
+                  : 0;
+          increasingMemTableMemInfo
+              .get(deviceId)
+              .computeIfPresent(measurements[i], (k, v) -> v + 1);
+        }
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+    }
+    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
+    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+  }
+
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
-  private long[] checkAlignedMemCostAndAddToTspInfo(
+  private long[] checkAlignedMemCostAndAddToTspInfoForRow(
       IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
@@ -617,7 +662,77 @@ public class TsFileProcessor {
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  private long[] checkMemCostAndAddToTspInfo(
+  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+  private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+      throws WriteProcessException {
+    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
+    long memTableIncrement = 0L;
+    long textDataIncrement = 0L;
+    long chunkMetadataIncrement = 0L;
+    // device -> (measurements, adding aligned TVList size)
+    Map<IDeviceID, Pair<Set<String>, Integer>> increasingMemTableMemInfo = new 
HashMap<>();
+    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+      IDeviceID deviceId = insertRowNode.getDeviceID();
+      TSDataType[] dataTypes = insertRowNode.getDataTypes();
+      Object[] values = insertRowNode.getValues();
+      String[] measurements = insertRowNode.getMeasurements();
+      if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+          || !increasingMemTableMemInfo.containsKey(deviceId)) {
+        // For new device of this mem table
+        // ChunkMetadataIncrement
+        chunkMetadataIncrement +=
+            ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+                * dataTypes.length;
+        memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+        for (int i = 0; i < dataTypes.length; i++) {
+          // Skip failed Measurements
+          if (dataTypes[i] == null || measurements[i] == null) {
+            continue;
+          }
+          Objects.requireNonNull(
+              increasingMemTableMemInfo.putIfAbsent(deviceId, new Pair<>(new 
HashSet<>(), 1))).left.add(measurements[i]);
+          // TEXT data mem size
+          if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+            textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+          }
+        }
+
+      } else {
+        // For existed device of this mem table
+        AlignedWritableMemChunk alignedMemChunk =
+            ((AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId))
+                .getAlignedMemChunk();
+        List<TSDataType> dataTypesInTVList = new ArrayList<>();
+        for (int i = 0; i < dataTypes.length; i++) {
+          // Skip failed Measurements
+          if (dataTypes[i] == null || measurements[i] == null) {
+            continue;
+          }
+
+          // Extending the column of aligned mem chunk
+          if (!alignedMemChunk.containsMeasurement(measurements[i])) {
+            memTableIncrement +=
+                (alignedMemChunk.alignedListSize() / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                    * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+            dataTypesInTVList.add(dataTypes[i]);
+          }
+          // TEXT data mem size
+          if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+            textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+          }
+        }
+        // Here currentChunkPointNum >= 1
+        if ((alignedMemChunk.alignedListSize() % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
+          dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
+          memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
+        }
+      }
+    }
+    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
+    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+  }
+
+  private long[] checkMemCostAndAddToTspInfoForTablet(
       IDeviceID deviceId,
       String[] measurements,
       TSDataType[] dataTypes,
@@ -644,7 +759,7 @@ public class TsFileProcessor {
     return memIncrements;
   }
 
-  private long[] checkAlignedMemCostAndAddToTsp(
+  private long[] checkAlignedMemCostAndAddToTspForTablet(
       IDeviceID deviceId,
       String[] measurements,
       TSDataType[] dataTypes,

Reply via email to