This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dc323ffed6e Fix insert rows mem control error
dc323ffed6e is described below
commit dc323ffed6e89e60f71e871091f407035700d9af
Author: Haonan <[email protected]>
AuthorDate: Sat May 11 11:22:21 2024 +0800
Fix insert rows mem control error
---
.../dataregion/memtable/AbstractMemTable.java | 3 +
.../dataregion/memtable/TsFileProcessor.java | 167 ++++++++++++++---
.../dataregion/memtable/WritableMemChunkGroup.java | 3 +
.../dataregion/memtable/TsFileProcessorTest.java | 200 +++++++++++++++++++++
4 files changed, 352 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 7c7e46ec0ce..2182cec0007 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -399,6 +399,9 @@ public abstract class AbstractMemTable implements IMemTable
{
@Override
public long getCurrentTVListSize(IDeviceID deviceId, String measurement) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+ if (null == memChunkGroup) {
+ return 0;
+ }
return memChunkGroup.getCurrentTVListSize(measurement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 8cbca98b754..819d5bae8a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -253,12 +253,12 @@ public class TsFileProcessor {
long memControlStartTime = System.nanoTime();
if (insertRowNode.isAligned()) {
memIncrements =
- checkAlignedMemCostAndAddToTspInfo(
+ checkAlignedMemCostAndAddToTspInfoForRow(
insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
insertRowNode.getDataTypes(), insertRowNode.getValues());
} else {
memIncrements =
- checkMemCostAndAddToTspInfo(
+ checkMemCostAndAddToTspInfoForRow(
insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
insertRowNode.getDataTypes(), insertRowNode.getValues());
}
@@ -328,23 +328,13 @@ public class TsFileProcessor {
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
}
- long[] memIncrements = null;
+ long[] memIncrements;
long memControlStartTime = System.nanoTime();
if (insertRowsNode.isAligned()) {
- for (InsertRowNode insertRowNode :
insertRowsNode.getInsertRowNodeList()) {
- memIncrements =
- checkAlignedMemCostAndAddToTspInfo(
- insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
- insertRowNode.getDataTypes(), insertRowNode.getValues());
- }
+ memIncrements =
checkAlignedMemCostAndAddToTspInfoForRows(insertRowsNode);
} else {
- for (InsertRowNode insertRowNode :
insertRowsNode.getInsertRowNodeList()) {
- memIncrements =
- checkMemCostAndAddToTspInfo(
- insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
- insertRowNode.getDataTypes(), insertRowNode.getValues());
- }
+ memIncrements = checkMemCostAndAddToTspInfoForRows(insertRowsNode);
}
// recordScheduleMemoryBlockCost
costsForMetrics[1] += System.nanoTime() - memControlStartTime;
@@ -437,7 +427,7 @@ public class TsFileProcessor {
long startTime = System.nanoTime();
if (insertTabletNode.isAligned()) {
memIncrements =
- checkAlignedMemCostAndAddToTsp(
+ checkAlignedMemCostAndAddToTspForTablet(
insertTabletNode.getDeviceID(),
insertTabletNode.getMeasurements(),
insertTabletNode.getDataTypes(),
@@ -446,7 +436,7 @@ public class TsFileProcessor {
end);
} else {
memIncrements =
- checkMemCostAndAddToTspInfo(
+ checkMemCostAndAddToTspInfoForTablet(
insertTabletNode.getDeviceID(),
insertTabletNode.getMeasurements(),
insertTabletNode.getDataTypes(),
@@ -523,7 +513,7 @@ public class TsFileProcessor {
}
@SuppressWarnings("squid:S3776") // High Cognitive Complexity
- private long[] checkMemCostAndAddToTspInfo(
+ private long[] checkMemCostAndAddToTspInfoForRow(
IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes,
Object[] values)
throws WriteProcessException {
// Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
@@ -557,8 +547,59 @@ public class TsFileProcessor {
return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
+ @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+ private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode
insertRowsNode)
+ throws WriteProcessException {
+ // Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
+ long memTableIncrement = 0L;
+ long textDataIncrement = 0L;
+ long chunkMetadataIncrement = 0L;
+ // device -> measurement -> adding TVList size
+ Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new
HashMap<>();
+ for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+ IDeviceID deviceId = insertRowNode.getDeviceID();
+ TSDataType[] dataTypes = insertRowNode.getDataTypes();
+ Object[] values = insertRowNode.getValues();
+ String[] measurements = insertRowNode.getMeasurements();
+ for (int i = 0; i < dataTypes.length; i++) {
+ // Skip failed Measurements
+ if (dataTypes[i] == null || measurements[i] == null) {
+ continue;
+ }
+ if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
+ && (!increasingMemTableInfo.containsKey(deviceId)
+ ||
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
+ // ChunkMetadataIncrement
+ chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
+ memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
+ increasingMemTableInfo
+ .computeIfAbsent(deviceId, k -> new HashMap<>())
+ .putIfAbsent(measurements[i], 1);
+ } else {
+ // here currentChunkPointNum >= 1
+ long currentChunkPointNum =
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
+ int addingPointNum =
+ increasingMemTableInfo
+ .computeIfAbsent(deviceId, k -> new HashMap<>())
+ .computeIfAbsent(measurements[i], k -> 0);
+ memTableIncrement +=
+ ((currentChunkPointNum + addingPointNum) %
PrimitiveArrayManager.ARRAY_SIZE) == 0
+ ? TVList.tvListArrayMemCost(dataTypes[i])
+ : 0;
+
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v)
-> v + 1);
+ }
+ // TEXT data mem size
+ if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+ textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+ }
+ }
+ }
+ updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
+ return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
+ }
+
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
- private long[] checkAlignedMemCostAndAddToTspInfo(
+ private long[] checkAlignedMemCostAndAddToTspInfoForRow(
IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes,
Object[] values)
throws WriteProcessException {
// Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
@@ -617,7 +658,91 @@ public class TsFileProcessor {
return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
- private long[] checkMemCostAndAddToTspInfo(
+ @SuppressWarnings("squid:S3776") // high Cognitive Complexity
+ private long[] checkAlignedMemCostAndAddToTspInfoForRows(InsertRowsNode
insertRowsNode)
+ throws WriteProcessException {
+ // Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
+ long memTableIncrement = 0L;
+ long textDataIncrement = 0L;
+ long chunkMetadataIncrement = 0L;
+ // device -> (measurements -> datatype, adding aligned TVList size)
+ Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>>
increasingMemTableInfo = new HashMap<>();
+ for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+ IDeviceID deviceId = insertRowNode.getDeviceID();
+ TSDataType[] dataTypes = insertRowNode.getDataTypes();
+ Object[] values = insertRowNode.getValues();
+ String[] measurements = insertRowNode.getMeasurements();
+ if (workMemTable.checkIfChunkDoesNotExist(deviceId,
AlignedPath.VECTOR_PLACEHOLDER)
+ && !increasingMemTableInfo.containsKey(deviceId)) {
+ // For new device of this mem table
+ // ChunkMetadataIncrement
+ chunkMetadataIncrement +=
+ ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR)
+ * dataTypes.length;
+ memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+ for (int i = 0; i < dataTypes.length; i++) {
+ // Skip failed Measurements
+ if (dataTypes[i] == null || measurements[i] == null) {
+ continue;
+ }
+ increasingMemTableInfo
+ .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+ .left
+ .put(measurements[i], dataTypes[i]);
+ // TEXT data mem size
+ if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+ textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+ }
+ }
+
+ } else {
+ // For existed device of this mem table
+ AlignedWritableMemChunkGroup memChunkGroup =
+ (AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId);
+ AlignedWritableMemChunk alignedMemChunk =
+ memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+ long currentChunkPointNum = alignedMemChunk == null ? 0 :
alignedMemChunk.alignedListSize();
+ List<TSDataType> dataTypesInTVList = new ArrayList<>();
+ Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+ increasingMemTableInfo.computeIfAbsent(deviceId, k -> new
Pair<>(new HashMap<>(), 0));
+ for (int i = 0; i < dataTypes.length; i++) {
+ // Skip failed Measurements
+ if (dataTypes[i] == null || measurements[i] == null) {
+ continue;
+ }
+
+ int addingPointNum = addingPointNumInfo.getRight();
+ // Extending the column of aligned mem chunk
+ if ((alignedMemChunk != null &&
!alignedMemChunk.containsMeasurement(measurements[i]))
+ &&
!increasingMemTableInfo.get(deviceId).left.containsKey(measurements[i])) {
+ memTableIncrement +=
+ ((currentChunkPointNum + addingPointNum) /
PrimitiveArrayManager.ARRAY_SIZE + 1)
+ * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+ increasingMemTableInfo.get(deviceId).left.put(measurements[i],
dataTypes[i]);
+ }
+ // TEXT data mem size
+ if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+ textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+ }
+ }
+ int addingPointNum = increasingMemTableInfo.get(deviceId).getRight();
+ // Here currentChunkPointNum + addingPointNum >= 1
+ if (((currentChunkPointNum + addingPointNum) %
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
+ if (alignedMemChunk != null) {
+ dataTypesInTVList.addAll(
+ ((AlignedTVList)
alignedMemChunk.getTVList()).getTsDataTypes());
+ }
+
dataTypesInTVList.addAll(increasingMemTableInfo.get(deviceId).getLeft().values());
+ memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
+ }
+ increasingMemTableInfo.get(deviceId).setRight(addingPointNum + 1);
+ }
+ }
+ updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
+ return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
+ }
+
+ private long[] checkMemCostAndAddToTspInfoForTablet(
IDeviceID deviceId,
String[] measurements,
TSDataType[] dataTypes,
@@ -644,7 +769,7 @@ public class TsFileProcessor {
return memIncrements;
}
- private long[] checkAlignedMemCostAndAddToTsp(
+ private long[] checkAlignedMemCostAndAddToTspForTablet(
IDeviceID deviceId,
String[] measurements,
TSDataType[] dataTypes,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 3b7133d48b2..5d61fe0e3dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -161,6 +161,9 @@ public class WritableMemChunkGroup implements
IWritableMemChunkGroup {
@Override
public long getCurrentTVListSize(String measurement) {
+ if (!memChunkMap.containsKey(measurement)) {
+ return 0;
+ }
return memChunkMap.get(measurement).getTVList().rowCount();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index de25e822d58..50b2a5d3ea6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -29,6 +29,9 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
import org.apache.iotdb.db.storageengine.dataregion.DataRegionTest;
@@ -413,6 +416,203 @@ public class TsFileProcessorTest {
Assert.assertEquals(1441200, memTable.memSize());
}
+ @Test
+ public void testRamCostInsertSameNonAlignedDataBy2Ways()
+ throws MetadataException, WriteProcessException, IOException {
+ TsFileProcessor processor1 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+ processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+ this.sgInfo.initTsFileProcessorInfo(processor1);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+ // insert 100 rows by insertRow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
+ processor1.insert(buildInsertRowNodeByTSRecord(record), new long[4]);
+ }
+ IMemTable memTable1 = processor1.getWorkMemTable();
+
+ TsFileProcessor processor2 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+ processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+ this.sgInfo.initTsFileProcessorInfo(processor2);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+ InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ // insert 100 rows by insertRows
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
+ insertRowsNode.addOneInsertRowNode(buildInsertRowNodeByTSRecord(record),
i - 1);
+ }
+ processor2.insert(insertRowsNode, new long[4]);
+ IMemTable memTable2 = processor2.getWorkMemTable();
+
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+ // insert more rows by insertRow
+ TSRecord record = new TSRecord(101, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+ InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record);
+ processor1.insert(insertRowNode1, new long[4]);
+ record = new TSRecord(101, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+ InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record);
+ processor1.insert(insertRowNode2, new long[4]);
+ record = new TSRecord(102, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+ InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record);
+ processor1.insert(insertRowNode3, new long[4]);
+ record = new TSRecord(102, "root.vehicle.d2");
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+ InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record);
+ processor1.insert(insertRowNode4, new long[4]);
+
+ // insert more rows by insertRows
+ insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+ insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
+ insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
+ insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
+ processor2.insert(insertRowsNode, new long[4]);
+
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+ // Insert rows with all column null
+ insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowNode1.setDataTypes(new TSDataType[1]);
+ insertRowNode1.setMeasurements(new String[1]);
+ insertRowNode1.setValues(new String[1]);
+ insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+ processor2.insert(insertRowsNode, new long[4]);
+
+ processor1.insert(insertRowNode1, new long[4]);
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+ }
+
+ @Test
+ public void testRamCostInsertSameAlignedDataBy2Ways()
+ throws MetadataException, WriteProcessException, IOException {
+ TsFileProcessor processor1 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo1 = new TsFileProcessorInfo(sgInfo);
+ processor1.setTsFileProcessorInfo(tsFileProcessorInfo1);
+ this.sgInfo.initTsFileProcessorInfo(processor1);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor1);
+ // insert 100 rows by insertRow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
+ InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+ node.setAligned(true);
+ processor1.insert(node, new long[4]);
+ }
+ IMemTable memTable1 = processor1.getWorkMemTable();
+
+ TsFileProcessor processor2 =
+ new TsFileProcessor(
+ storageGroup,
+ SystemFileFactory.INSTANCE.getFile(filePath),
+ sgInfo,
+ this::closeTsFileProcessor,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
+ true);
+ TsFileProcessorInfo tsFileProcessorInfo2 = new TsFileProcessorInfo(sgInfo);
+ processor2.setTsFileProcessorInfo(tsFileProcessorInfo2);
+ this.sgInfo.initTsFileProcessorInfo(processor2);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor2);
+ InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowsNode.setAligned(true);
+ // insert 100 rows by insertRows
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId,
String.valueOf(i)));
+ InsertRowNode node = buildInsertRowNodeByTSRecord(record);
+ node.setAligned(true);
+ insertRowsNode.addOneInsertRowNode(node, i - 1);
+ }
+ processor2.insert(insertRowsNode, new long[4]);
+ IMemTable memTable2 = processor2.getWorkMemTable();
+
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+ // insert more rows by insertRow
+ TSRecord record = new TSRecord(101, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+ InsertRowNode insertRowNode1 = buildInsertRowNodeByTSRecord(record);
+ insertRowNode1.setAligned(true);
+ processor1.insert(insertRowNode1, new long[4]);
+ record = new TSRecord(101, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+ InsertRowNode insertRowNode2 = buildInsertRowNodeByTSRecord(record);
+ insertRowNode2.setAligned(true);
+ processor1.insert(insertRowNode2, new long[4]);
+ record = new TSRecord(102, deviceId);
+ record.addTuple(DataPoint.getDataPoint(dataType, "s99", "1"));
+ InsertRowNode insertRowNode3 = buildInsertRowNodeByTSRecord(record);
+ insertRowNode3.setAligned(true);
+ processor1.insert(insertRowNode3, new long[4]);
+ record = new TSRecord(102, "root.vehicle.d2");
+ record.addTuple(DataPoint.getDataPoint(dataType, measurementId, "1"));
+ InsertRowNode insertRowNode4 = buildInsertRowNodeByTSRecord(record);
+ insertRowNode4.setAligned(true);
+ processor1.insert(insertRowNode4, new long[4]);
+
+ // insert more rows by insertRows
+ insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+ insertRowsNode.addOneInsertRowNode(insertRowNode2, 1);
+ insertRowsNode.addOneInsertRowNode(insertRowNode3, 2);
+ insertRowsNode.addOneInsertRowNode(insertRowNode4, 3);
+ insertRowsNode.setAligned(true);
+ processor2.insert(insertRowsNode, new long[4]);
+
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+
+ // Insert rows with all column null
+ insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+ insertRowNode1.setDataTypes(new TSDataType[1]);
+ insertRowNode1.setMeasurements(new String[1]);
+ insertRowNode1.setValues(new String[1]);
+ insertRowsNode.addOneInsertRowNode(insertRowNode1, 0);
+ insertRowsNode.setAligned(true);
+ processor2.insert(insertRowsNode, new long[4]);
+
+ processor1.insert(insertRowNode1, new long[4]);
+ Assert.assertEquals(memTable1.getTVListsRamCost(),
memTable2.getTVListsRamCost());
+ Assert.assertEquals(memTable1.getTotalPointsNum(),
memTable2.getTotalPointsNum());
+ Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
+ }
+
@Test
public void testWriteAndClose() throws IOException, WriteProcessException,
MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");