This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dc323ffed6e Fix insert rows mem control error
dc323ffed6e is described below

commit dc323ffed6e89e60f71e871091f407035700d9af
Author: Haonan <[email protected]>
AuthorDate: Sat May 11 11:22:21 2024 +0800

    Fix insert rows mem control error
---
 .../dataregion/memtable/AbstractMemTable.java      |   3 +
 .../dataregion/memtable/TsFileProcessor.java       | 167 ++++++++++++++---
 .../dataregion/memtable/WritableMemChunkGroup.java |   3 +
 .../dataregion/memtable/TsFileProcessorTest.java   | 200 +++++++++++++++++++++
 4 files changed, 352 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 7c7e46ec0ce..2182cec0007 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -399,6 +399,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
   @Override
   public long getCurrentTVListSize(IDeviceID deviceId, String measurement) {
     IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+    if (null == memChunkGroup) {
+      return 0;
+    }
     return memChunkGroup.getCurrentTVListSize(measurement);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 8cbca98b754..819d5bae8a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -253,12 +253,12 @@ public class TsFileProcessor {
     long memControlStartTime = System.nanoTime();
     if (insertRowNode.isAligned()) {
       memIncrements =
-          checkAlignedMemCostAndAddToTspInfo(
+          checkAlignedMemCostAndAddToTspInfoForRow(
               insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
               insertRowNode.getDataTypes(), insertRowNode.getValues());
     } else {
       memIncrements =
-          checkMemCostAndAddToTspInfo(
+          checkMemCostAndAddToTspInfoForRow(
               insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
               insertRowNode.getDataTypes(), insertRowNode.getValues());
     }
@@ -328,23 +328,13 @@ public class TsFileProcessor {
           
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
     }
 
-    long[] memIncrements = null;
+    long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
     if (insertRowsNode.isAligned()) {
-      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
-        memIncrements =
-            checkAlignedMemCostAndAddToTspInfo(
-                insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
-                insertRowNode.getDataTypes(), insertRowNode.getValues());
-      }
+      memIncrements = 
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
     } else {
-      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
-        memIncrements =
-            checkMemCostAndAddToTspInfo(
-                insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
-                insertRowNode.getDataTypes(), insertRowNode.getValues());
-      }
+      memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
     }
     // recordScheduleMemoryBlockCost
     costsForMetrics[1] += System.nanoTime() - memControlStartTime;
@@ -437,7 +427,7 @@ public class TsFileProcessor {
       long startTime = System.nanoTime();
       if (insertTabletNode.isAligned()) {
         memIncrements =
-            checkAlignedMemCostAndAddToTsp(
+            checkAlignedMemCostAndAddToTspForTablet(
                 insertTabletNode.getDeviceID(),
                 insertTabletNode.getMeasurements(),
                 insertTabletNode.getDataTypes(),
@@ -446,7 +436,7 @@ public class TsFileProcessor {
                 end);
       } else {
         memIncrements =
-            checkMemCostAndAddToTspInfo(
+            checkMemCostAndAddToTspInfoForTablet(
                 insertTabletNode.getDeviceID(),
                 insertTabletNode.getMeasurements(),
                 insertTabletNode.getDataTypes(),
@@ -523,7 +513,7 @@ public class TsFileProcessor {
   }
 
   @SuppressWarnings("squid:S3776") // High Cognitive Complexity
-  private long[] checkMemCostAndAddToTspInfo(
+  private long[] checkMemCostAndAddToTspInfoForRow(
       IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
@@ -557,8 +547,59 @@ public class TsFileProcessor {
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+      throws WriteProcessException {
+    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
+    long memTableIncrement = 0L;
+    long textDataIncrement = 0L;
+    long chunkMetadataIncrement = 0L;
+    // device -> measurement -> adding TVList size
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+      IDeviceID deviceId = insertRowNode.getDeviceID();
+      TSDataType[] dataTypes = insertRowNode.getDataTypes();
+      Object[] values = insertRowNode.getValues();
+      String[] measurements = insertRowNode.getMeasurements();
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
+            && (!increasingMemTableInfo.containsKey(deviceId)
+                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
+          // ChunkMetadataIncrement
+          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
+          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
+          increasingMemTableInfo
+              .computeIfAbsent(deviceId, k -> new HashMap<>())
+              .putIfAbsent(measurements[i], 1);
+        } else {
+          // here currentChunkPointNum >= 1
+          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
+          int addingPointNum =
+              increasingMemTableInfo
+                  .computeIfAbsent(deviceId, k -> new HashMap<>())
+                  .computeIfAbsent(measurements[i], k -> 0);
+          memTableIncrement +=
+              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
+                  ? TVList.tvListArrayMemCost(dataTypes[i])
+                  : 0;
+          
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) 
-> v + 1);
+        }
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+    }
+    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
+    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+  }
+
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
-  private long[] checkAlignedMemCostAndAddToTspInfo(
+  private long[] checkAlignedMemCostAndAddToTspInfoForRow(
       IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
@@ -617,7 +658,91 @@ public class TsFileProcessor {
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  private long[] checkMemCostAndAddToTspInfo(
+  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+  private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
+      throws WriteProcessException {
+    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
+    long memTableIncrement = 0L;
+    long textDataIncrement = 0L;
+    long chunkMetadataIncrement = 0L;
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfo = new HashMap<>();
+    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+      IDeviceID deviceId = insertRowNode.getDeviceID();
+      TSDataType[] dataTypes = insertRowNode.getDataTypes();
+      Object[] values = insertRowNode.getValues();
+      String[] measurements = insertRowNode.getMeasurements();
+      if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+          && !increasingMemTableInfo.containsKey(deviceId)) {
+        // For new device of this mem table
+        // ChunkMetadataIncrement
+        chunkMetadataIncrement +=
+            ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+                * dataTypes.length;
+        memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+        for (int i = 0; i < dataTypes.length; i++) {
+          // Skip failed Measurements
+          if (dataTypes[i] == null || measurements[i] == null) {
+            continue;
+          }
+          increasingMemTableInfo
+              .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+              .left
+              .put(measurements[i], dataTypes[i]);
+          // TEXT data mem size
+          if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+            textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+          }
+        }
+
+      } else {
+        // For existed device of this mem table
+        AlignedWritableMemChunkGroup memChunkGroup =
+            (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
+        AlignedWritableMemChunk alignedMemChunk =
+            memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+        long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+        List<TSDataType> dataTypesInTVList = new ArrayList<>();
+        Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+            increasingMemTableInfo.computeIfAbsent(deviceId, k -> new 
Pair<>(new HashMap<>(), 0));
+        for (int i = 0; i < dataTypes.length; i++) {
+          // Skip failed Measurements
+          if (dataTypes[i] == null || measurements[i] == null) {
+            continue;
+          }
+
+          int addingPointNum = addingPointNumInfo.getRight();
+          // Extending the column of aligned mem chunk
+          if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+              && 
!increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) {
+            memTableIncrement +=
+                ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                    * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+            increasingMemTableInfo.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
+          }
+          // TEXT data mem size
+          if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+            textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+          }
+        }
+        int addingPointNum = increasingMemTableInfo.get(deviceId).getRight();
+        // Here currentChunkPointNum + addingPointNum >= 1
+        if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
+          if (alignedMemChunk != null) {
+            dataTypesInTVList.addAll(
+                ((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
+          }
+          
dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values());
+          memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
+        }
+        increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1);
+      }
+    }
+    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
+    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+  }
+
+  private long[] checkMemCostAndAddToTspInfoForTablet(
       IDeviceID deviceId,
       String[] measurements,
       TSDataType[] dataTypes,
@@ -644,7 +769,7 @@ public class TsFileProcessor {
     return memIncrements;
   }
 
-  private long[] checkAlignedMemCostAndAddToTsp(
+  private long[] checkAlignedMemCostAndAddToTspForTablet(
       IDeviceID deviceId,
       String[] measurements,
       TSDataType[] dataTypes,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 3b7133d48b2..5d61fe0e3dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -161,6 +161,9 @@ public class WritableMemChunkGroup implements 
IWritableMemChunkGroup {
 
   @Override
   public long getCurrentTVListSize(String measurement) {
+    if (!memChunkMap.containsKey(measurement)) {
+      return 0;
+    }
     return memChunkMap.get(measurement).getTVList().rowCount();
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index de25e822d58..50b2a5d3ea6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -29,6 +29,9 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionTest;
@@ -413,6 +416,203 @@ public class TsFileProcessorTest {
     Assert.assertEquals(1441200, memTable.memSize());
   }
 
+  @Test
+  public void testRamCostInsertSameNonAlignedDataBy2Ways()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      processor1.insert(buildInsertRowNodeByTSRecord(record), new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    // insert 100 rows by insertRows
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record), 
i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+    // insert more rows by insertRow
+    TSRecord record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode1, new long[4]);
+    record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode2, new long[4]);
+    record = new TSRecord(102, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode3, new long[4]);
+    record = new TSRecord(102, "root.vehicle.d2");
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record);
+    processor1.insert(insertRowNode4, new long[4]);
+
+    // insert more rows by insertRows
+    insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+    insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
+    insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
+    insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
+    processor2.insert(insertRowsNode, new long[4]);
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+    // Insert rows with all column null
+    insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowNode1.setDataTypes(new TSDataType[1]);
+    insertRowNode1.setMeasurements(new String[1]);
+    insertRowNode1.setValues(new String[1]);
+    insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+    processor2.insert(insertRowsNode, new long[4]);
+
+    processor1.insert(insertRowNode1, new long[4]);
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
+  @Test
+  public void testRamCostInsertSameAlignedDataBy2Ways()
+      throws MetadataException, WriteProcessException, IOException {
+    TsFileProcessor processor1 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+    processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+    this.sgInfo.initTsFileProcessorInfo(processor1);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+    // insert 100 rows by insertRow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      processor1.insert(node, new long[4]);
+    }
+    IMemTable memTable1 = processor1.getWorkMemTable();
+
+    TsFileProcessor processor2 =
+        new TsFileProcessor(
+            storageGroup,
+            SystemFileFactory.INSTANCE.getFile(filePath),
+            sgInfo,
+            this::closeTsFileProcessor,
+            (tsFileProcessor, updateMap, systemFlushTime) -> {},
+            true);
+    TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+    processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+    this.sgInfo.initTsFileProcessorInfo(processor2);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.setAligned(true);
+    // insert 100 rows by insertRows
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, deviceId);
+      record.addTuple(DataPoint.getDataPoint(dataType, measurementId, 
String.valueOf(i)));
+      InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+      node.setAligned(true);
+      insertRowsNode.addOneInsertRowNode(node, i - 1);
+    }
+    processor2.insert(insertRowsNode, new long[4]);
+    IMemTable memTable2 = processor2.getWorkMemTable();
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+    // insert more rows by insertRow
+    TSRecord record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode1.setAligned(true);
+    processor1.insert(insertRowNode1, new long[4]);
+    record = new TSRecord(101, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode2.setAligned(true);
+    processor1.insert(insertRowNode2, new long[4]);
+    record = new TSRecord(102, deviceId);
+    record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+    InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode3.setAligned(true);
+    processor1.insert(insertRowNode3, new long[4]);
+    record = new TSRecord(102, "root.vehicle.d2");
+    record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+    InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record);
+    insertRowNode4.setAligned(true);
+    processor1.insert(insertRowNode4, new long[4]);
+
+    // insert more rows by insertRows
+    insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+    insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
+    insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
+    insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
+    insertRowsNode.setAligned(true);
+    processor2.insert(insertRowsNode, new long[4]);
+
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+    // Insert rows with all column null
+    insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    insertRowNode1.setDataTypes(new TSDataType[1]);
+    insertRowNode1.setMeasurements(new String[1]);
+    insertRowNode1.setValues(new String[1]);
+    insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+    insertRowsNode.setAligned(true);
+    processor2.insert(insertRowsNode, new long[4]);
+
+    processor1.insert(insertRowNode1, new long[4]);
+    Assert.assertEquals(memTable1.getTVListsRamCost(), 
memTable2.getTVListsRamCost());
+    Assert.assertEquals(memTable1.getTotalPointsNum(), 
memTable2.getTotalPointsNum());
+    Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+  }
+
   @Test
   public void testWriteAndClose() throws IOException, WriteProcessException, 
MetadataException {
     logger.info("testWriteAndRestoreMetadata begin..");

Reply via email to