This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new fea9a2f67a6 [To dev/1.3] Pipe: Fixed multiple leakages of
InsertNodeMemoryEstimator (#17657) (#17685)
fea9a2f67a6 is described below
commit fea9a2f67a6fd44473f9bb66e574a79ec23fe675
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 15 16:37:25 2026 +0800
[To dev/1.3] Pipe: Fixed multiple leakages of InsertNodeMemoryEstimator
(#17657) (#17685)
* Pipe: Fixed multiple leakages of InsertNodeMemoryEstimator (#17657)
* Fix
* sptls
* Update InsertNodeMemoryEstimator.java
---
.../resource/memory/InsertNodeMemoryEstimator.java | 358 +++++++++++++--------
.../memory/InsertNodeMemoryEstimatorTest.java | 265 +++++++++++++++
2 files changed, 490 insertions(+), 133 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
index c29eb15371c..9616777b021 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -44,10 +45,14 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
public class InsertNodeMemoryEstimator {
@@ -58,7 +63,6 @@ public class InsertNodeMemoryEstimator {
private static final String INSERT_ROWS_NODE = "InsertRowsNode";
private static final String INSERT_ROWS_OF_ONE_DEVICE_NODE =
"InsertRowsOfOneDeviceNode";
private static final String INSERT_MULTI_TABLETS_NODE =
"InsertMultiTabletsNode";
- private static final String RELATIONAL_INSERT_ROW_NODE =
"RelationalInsertRowNode";
private static final long NUM_BYTES_OBJECT_REF =
RamUsageEstimator.NUM_BYTES_OBJECT_REF;
private static final long NUM_BYTES_OBJECT_HEADER =
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
@@ -158,174 +162,104 @@ public class InsertNodeMemoryEstimator {
// =============================InsertNode==================================
private static long calculateFullInsertNodeSize(final InsertNode node) {
+ return calculateFullInsertNodeSize(node, null);
+ }
+
+ private static long calculateFullInsertNodeSize(
+ final InsertNode node, final Set<Object> deduplicatedObjects) {
long size = 0;
+ // PlanNodeId
+ size += sizeOfPlanNodeId(node.getPlanNodeId(), deduplicatedObjects);
// PartialPath
- size += sizeOfPartialPath(node.getDevicePath());
+ size += sizeOfPartialPath(node.getDevicePath(), deduplicatedObjects);
// MeasurementSchemas
- size += sizeOfMeasurementSchemas(node.getMeasurementSchemas());
+ size += sizeOfMeasurementSchemas(node.getMeasurementSchemas(),
deduplicatedObjects);
// Measurement
- size += sizeOfStringArray(node.getMeasurements());
+ size += sizeOfStringArray(node.getMeasurements(), deduplicatedObjects);
// dataTypes
- size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
+ size += sizeOfShallowObject(node.getDataTypes(), deduplicatedObjects);
// deviceID
if (node.isDeviceIDExists()) {
- size += sizeOfIDeviceID(node.getDeviceID());
+ size += sizeOfIDeviceID(node.getDeviceID(), deduplicatedObjects);
}
// dataRegionReplicaSet
- size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet());
+ size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet(),
deduplicatedObjects);
// progressIndex
- size += sizeOfProgressIndex(node.getProgressIndex());
- return size;
- }
-
- private static long calculateInsertNodeSizeExcludingSchemas(final InsertNode
node) {
- // Measurement
- long size = 2 *
RamUsageEstimator.shallowSizeOf(node.getMeasurementSchemas());
- // dataTypes
- size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
- // deviceID
- if (node.isDeviceIDExists()) {
- size += sizeOfIDeviceID(node.getDeviceID());
- }
- // dataRegionReplicaSet
- size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet());
- // progressIndex
- size += sizeOfProgressIndex(node.getProgressIndex());
+ size += sizeOfProgressIndex(node.getProgressIndex(), deduplicatedObjects);
return size;
}
private static long sizeOfInsertTabletNode(final InsertTabletNode node) {
- long size = INSERT_TABLET_NODE_SIZE;
- size += calculateFullInsertNodeSize(node);
- size += RamUsageEstimator.sizeOf(node.getTimes());
- size += sizeOfBitMapArray(node.getBitMaps());
- size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
- final List<Integer> range = node.getRange();
- if (range != null) {
- size += NUM_BYTES_OBJECT_HEADER + SIZE_OF_INT * range.size();
- }
- return size;
+ return sizeOfInsertTabletNode(node, newDeduplicatedObjectSet());
}
- private static long calculateInsertTabletNodeSizeExcludingSchemas(final
InsertTabletNode node) {
+ private static long sizeOfInsertTabletNode(
+ final InsertTabletNode node, final Set<Object> deduplicatedObjects) {
long size = INSERT_TABLET_NODE_SIZE;
-
- size += calculateInsertNodeSizeExcludingSchemas(node);
-
+ size += calculateFullInsertNodeSize(node, deduplicatedObjects);
size += RamUsageEstimator.sizeOf(node.getTimes());
-
size += sizeOfBitMapArray(node.getBitMaps());
-
size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
-
- final List<Integer> range = node.getRange();
- if (range != null) {
- size += NUM_BYTES_OBJECT_HEADER + SIZE_OF_INT * range.size();
- }
+ size += sizeOfIntegerList(node.getRange());
return size;
}
private static long sizeOfInsertRowNode(final InsertRowNode node) {
- long size = INSERT_ROW_NODE_SIZE;
- size += calculateFullInsertNodeSize(node);
- size += sizeOfValues(node.getValues(), node.getMeasurementSchemas());
- return size;
+ return sizeOfInsertRowNode(node, newDeduplicatedObjectSet());
}
- private static long calculateInsertRowNodeExcludingSchemas(final
InsertRowNode node) {
+ private static long sizeOfInsertRowNode(
+ final InsertRowNode node, final Set<Object> deduplicatedObjects) {
long size = INSERT_ROW_NODE_SIZE;
- size += calculateInsertNodeSizeExcludingSchemas(node);
+ size += calculateFullInsertNodeSize(node, deduplicatedObjects);
size += sizeOfValues(node.getValues(), node.getMeasurementSchemas());
return size;
}
private static long sizeOfInsertRowsNode(final InsertRowsNode node) {
+ final Set<Object> deduplicatedObjects = newDeduplicatedObjectSet();
long size = INSERT_ROWS_NODE_SIZE;
- size += calculateFullInsertNodeSize(node);
- final List<InsertRowNode> rows = node.getInsertRowNodeList();
- final List<Integer> indexList = node.getInsertRowNodeIndexList();
- if (rows != null && !rows.isEmpty()) {
- // InsertRowNodeList
- size += NUM_BYTES_OBJECT_HEADER;
- size +=
- (calculateInsertRowNodeExcludingSchemas(rows.get(0)) +
NUM_BYTES_OBJECT_REF)
- * rows.size();
- size += sizeOfPartialPath(rows.get(0).getDevicePath());
- size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
- // InsertRowNodeIndexList
- size += NUM_BYTES_OBJECT_HEADER;
- size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
- }
+ size += calculateFullInsertNodeSize(node, deduplicatedObjects);
+ size += sizeOfInsertRowNodeList(node.getInsertRowNodeList(),
deduplicatedObjects);
+ size += sizeOfIntegerList(node.getInsertRowNodeIndexList());
+ size += sizeOfResults(node.getResults());
return size;
}
private static long sizeOfInsertRowsOfOneDeviceNode(final
InsertRowsOfOneDeviceNode node) {
+ final Set<Object> deduplicatedObjects = newDeduplicatedObjectSet();
long size = INSERT_ROWS_OF_ONE_DEVICE_NODE_SIZE;
- size += calculateFullInsertNodeSize(node);
- final List<InsertRowNode> rows = node.getInsertRowNodeList();
- final List<Integer> indexList = node.getInsertRowNodeIndexList();
- if (rows != null && !rows.isEmpty()) {
- // InsertRowNodeList
- size += NUM_BYTES_OBJECT_HEADER;
- size +=
- (calculateInsertRowNodeExcludingSchemas(rows.get(0)) +
NUM_BYTES_OBJECT_REF)
- * rows.size();
- size += sizeOfPartialPath(rows.get(0).getDevicePath());
- size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
- // InsertRowNodeIndexList
- size += NUM_BYTES_OBJECT_HEADER;
- size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
- }
- // results
- size += NUM_BYTES_OBJECT_HEADER;
- for (Map.Entry<Integer, TSStatus> entry : node.getResults().entrySet()) {
- size +=
- Integer.BYTES
- + sizeOfTSStatus(entry.getValue())
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
- }
+ size += calculateFullInsertNodeSize(node, deduplicatedObjects);
+ size += sizeOfInsertRowNodeList(node.getInsertRowNodeList(),
deduplicatedObjects);
+ size += sizeOfIntegerList(node.getInsertRowNodeIndexList());
+ size += sizeOfResults(node.getResults());
return size;
}
private static long sizeOfInsertMultiTabletsNode(final
InsertMultiTabletsNode node) {
+ final Set<Object> deduplicatedObjects = newDeduplicatedObjectSet();
long size = INSERT_MULTI_TABLETS_NODE_SIZE;
- size += calculateFullInsertNodeSize(node);
- // dataTypes
- size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
-
- final List<InsertTabletNode> rows = node.getInsertTabletNodeList();
- final List<Integer> indexList = node.getParentInsertTabletNodeIndexList();
- if (rows != null && !rows.isEmpty()) {
- // InsertTabletNodeList
- size += NUM_BYTES_OBJECT_HEADER;
- size +=
- (calculateInsertTabletNodeSizeExcludingSchemas(rows.get(0)) +
NUM_BYTES_OBJECT_REF)
- * rows.size();
- size += sizeOfPartialPath(rows.get(0).getDevicePath());
- size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
- // ParentInsertTabletNodeIndexList
- size += NUM_BYTES_OBJECT_HEADER;
- size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
- }
- // results
- if (node.getResults() != null) {
- size += NUM_BYTES_OBJECT_HEADER;
- for (Map.Entry<Integer, TSStatus> entry : node.getResults().entrySet()) {
- size +=
- Integer.BYTES
- + sizeOfTSStatus(entry.getValue())
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
- }
- }
+ size += calculateFullInsertNodeSize(node, deduplicatedObjects);
+ size += sizeOfInsertTabletNodeList(node.getInsertTabletNodeList(),
deduplicatedObjects);
+ size += sizeOfIntegerList(node.getParentInsertTabletNodeIndexList());
+ size += sizeOfResults(node.getResults());
return size;
}
// ============================Device And
Measurement===================================
public static long sizeOfPartialPath(final PartialPath partialPath) {
+ return sizeOfPartialPath(partialPath, null);
+ }
+
+ private static long sizeOfPartialPath(
+ final PartialPath partialPath, final Set<Object> deduplicatedObjects) {
if (partialPath == null) {
return 0L;
}
+ if (!shouldCountObject(partialPath, deduplicatedObjects)) {
+ return 0L;
+ }
long size = PARTIAL_PATH_SIZE;
final String[] nodes = partialPath.getNodes();
if (nodes != null) {
@@ -338,22 +272,38 @@ public class InsertNodeMemoryEstimator {
}
public static long sizeOfMeasurementSchemas(final MeasurementSchema[]
measurementSchemas) {
+ return sizeOfMeasurementSchemas(measurementSchemas, null);
+ }
+
+ private static long sizeOfMeasurementSchemas(
+ final MeasurementSchema[] measurementSchemas, final Set<Object>
deduplicatedObjects) {
if (measurementSchemas == null) {
return 0L;
}
+ if (!shouldCountObject(measurementSchemas, deduplicatedObjects)) {
+ return 0L;
+ }
long size =
RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF *
measurementSchemas.length);
for (MeasurementSchema measurementSchema : measurementSchemas) {
- size += sizeOfMeasurementSchema(measurementSchema);
+ size += sizeOfMeasurementSchema(measurementSchema, deduplicatedObjects);
}
return size;
}
- private static long sizeOfMeasurementSchema(final MeasurementSchema
measurementSchema) {
+ public static long sizeOfMeasurementSchema(final MeasurementSchema
measurementSchema) {
+ return sizeOfMeasurementSchema(measurementSchema, null);
+ }
+
+ private static long sizeOfMeasurementSchema(
+ final MeasurementSchema measurementSchema, final Set<Object>
deduplicatedObjects) {
if (measurementSchema == null) {
return 0L;
}
+ if (!shouldCountObject(measurementSchema, deduplicatedObjects)) {
+ return 0L;
+ }
// Header + primitive + reference
long size = MEASUREMENT_SCHEMA_SIZE;
// measurementId
@@ -361,7 +311,7 @@ public class InsertNodeMemoryEstimator {
// props
final Map<String, String> props = measurementSchema.getProps();
if (props != null) {
- size += NUM_BYTES_OBJECT_HEADER;
+ size += RamUsageEstimator.shallowSizeOf(props);
for (Map.Entry<String, String> entry : props.entrySet()) {
size +=
RamUsageEstimator.sizeOf(entry.getKey())
@@ -373,16 +323,31 @@ public class InsertNodeMemoryEstimator {
return size;
}
- private static long sizeOfIDeviceID(final IDeviceID deviceID) {
- return Objects.nonNull(deviceID) ? deviceID.ramBytesUsed() : 0L;
+ public static long sizeOfIDeviceID(final IDeviceID deviceID) {
+ return sizeOfIDeviceID(deviceID, null);
+ }
+
+ private static long sizeOfIDeviceID(
+ final IDeviceID deviceID, final Set<Object> deduplicatedObjects) {
+ return Objects.nonNull(deviceID) && shouldCountObject(deviceID,
deduplicatedObjects)
+ ? deviceID.ramBytesUsed()
+ : 0L;
}
// =============================Thrift==================================
private static long sizeOfTRegionReplicaSet(final TRegionReplicaSet
tRegionReplicaSet) {
+ return sizeOfTRegionReplicaSet(tRegionReplicaSet, null);
+ }
+
+ private static long sizeOfTRegionReplicaSet(
+ final TRegionReplicaSet tRegionReplicaSet, final Set<Object>
deduplicatedObjects) {
if (tRegionReplicaSet == null) {
return 0L;
}
+ if (!shouldCountObject(tRegionReplicaSet, deduplicatedObjects)) {
+ return 0L;
+ }
// Memory alignment of basic types and reference types in structures
long size = T_REGION_REPLICA_SET_SIZE;
// Memory calculation in reference type, cannot get exact value, roughly
estimate
@@ -390,9 +355,9 @@ public class InsertNodeMemoryEstimator {
size += sizeOfTConsensusGroupId();
}
if (tRegionReplicaSet.isSetDataNodeLocations()) {
- size += NUM_BYTES_OBJECT_HEADER;
+ size += sizeOfObjectList(tRegionReplicaSet.getDataNodeLocations());
for (TDataNodeLocation tDataNodeLocation :
tRegionReplicaSet.getDataNodeLocations()) {
- size += sizeOfTDataNodeLocation(tDataNodeLocation);
+ size += sizeOfTDataNodeLocation(tDataNodeLocation,
deduplicatedObjects);
}
}
return size;
@@ -403,25 +368,39 @@ public class InsertNodeMemoryEstimator {
return T_CONSENSUS_GROUP_ID_SIZE;
}
- private static long sizeOfTDataNodeLocation(final TDataNodeLocation
tDataNodeLocation) {
+ private static long sizeOfTDataNodeLocation(
+ final TDataNodeLocation tDataNodeLocation, final Set<Object>
deduplicatedObjects) {
if (tDataNodeLocation == null) {
return 0L;
}
+ if (!shouldCountObject(tDataNodeLocation, deduplicatedObjects)) {
+ return 0L;
+ }
long size = T_DATA_NODE_LOCATION_SIZE;
- size += sizeOfTEndPoint(tDataNodeLocation.getClientRpcEndPoint());
- size += sizeOfTEndPoint(tDataNodeLocation.getInternalEndPoint());
- size += sizeOfTEndPoint(tDataNodeLocation.getMPPDataExchangeEndPoint());
- size +=
sizeOfTEndPoint(tDataNodeLocation.getDataRegionConsensusEndPoint());
- size +=
sizeOfTEndPoint(tDataNodeLocation.getSchemaRegionConsensusEndPoint());
+ size += sizeOfTEndPoint(tDataNodeLocation.getClientRpcEndPoint(),
deduplicatedObjects);
+ size += sizeOfTEndPoint(tDataNodeLocation.getInternalEndPoint(),
deduplicatedObjects);
+ size += sizeOfTEndPoint(tDataNodeLocation.getMPPDataExchangeEndPoint(),
deduplicatedObjects);
+ size +=
+ sizeOfTEndPoint(tDataNodeLocation.getDataRegionConsensusEndPoint(),
deduplicatedObjects);
+ size +=
+ sizeOfTEndPoint(tDataNodeLocation.getSchemaRegionConsensusEndPoint(),
deduplicatedObjects);
return size;
}
private static long sizeOfTEndPoint(final TEndPoint tEndPoint) {
+ return sizeOfTEndPoint(tEndPoint, null);
+ }
+
+ private static long sizeOfTEndPoint(
+ final TEndPoint tEndPoint, final Set<Object> deduplicatedObjects) {
if (tEndPoint == null) {
return 0L;
}
+ if (!shouldCountObject(tEndPoint, deduplicatedObjects)) {
+ return 0L;
+ }
// objectHeader + ip + port
long size = T_END_POINT_SIZE;
@@ -430,18 +409,32 @@ public class InsertNodeMemoryEstimator {
}
private static long sizeOfTSStatus(final TSStatus tSStatus) {
+ return sizeOfTSStatus(tSStatus, null);
+ }
+
+ private static long sizeOfTSStatus(
+ final TSStatus tSStatus, final Set<Object> deduplicatedObjects) {
if (tSStatus == null) {
return 0L;
}
+ if (!shouldCountObject(tSStatus, deduplicatedObjects)) {
+ return 0L;
+ }
long size = TS_STATUS_SIZE;
// message
if (tSStatus.isSetMessage()) {
size += RamUsageEstimator.sizeOf(tSStatus.message);
}
- // ignore subStatus
+ // subStatus
+ if (tSStatus.getSubStatus() != null) {
+ size += sizeOfObjectList(tSStatus.getSubStatus());
+ for (TSStatus subStatus : tSStatus.getSubStatus()) {
+ size += sizeOfTSStatus(subStatus, deduplicatedObjects);
+ }
+ }
// redirectNode
if (tSStatus.isSetRedirectNode()) {
- size += sizeOfTEndPoint(tSStatus.redirectNode);
+ size += sizeOfTEndPoint(tSStatus.redirectNode, deduplicatedObjects);
}
return size;
}
@@ -449,7 +442,14 @@ public class InsertNodeMemoryEstimator {
//
=============================ProgressIndex==================================
private static long sizeOfProgressIndex(final ProgressIndex progressIndex) {
- return Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0L;
+ return sizeOfProgressIndex(progressIndex, null);
+ }
+
+ private static long sizeOfProgressIndex(
+ final ProgressIndex progressIndex, final Set<Object>
deduplicatedObjects) {
+ return Objects.nonNull(progressIndex) && shouldCountObject(progressIndex,
deduplicatedObjects)
+ ? progressIndex.ramBytesUsed()
+ : 0L;
}
// =============================Write==================================
@@ -617,4 +617,96 @@ public class InsertNodeMemoryEstimator {
}
return size;
}
+
+ private static long sizeOfPlanNodeId(
+ final PlanNodeId planNodeId, final Set<Object> deduplicatedObjects) {
+ return planNodeId != null && shouldCountObject(planNodeId,
deduplicatedObjects)
+ ? planNodeId.ramBytesUsed()
+ : 0L;
+ }
+
+ private static long sizeOfStringArray(
+ final String[] strings, final Set<Object> deduplicatedObjects) {
+ return strings != null && shouldCountObject(strings, deduplicatedObjects)
+ ? RamUsageEstimator.sizeOf(strings)
+ : 0L;
+ }
+
+ private static long sizeOfShallowObject(
+ final Object object, final Set<Object> deduplicatedObjects) {
+ return object != null && shouldCountObject(object, deduplicatedObjects)
+ ? RamUsageEstimator.shallowSizeOf(object)
+ : 0L;
+ }
+
+ private static long sizeOfInsertRowNodeList(
+ final List<InsertRowNode> rows, final Set<Object> deduplicatedObjects) {
+ if (rows == null) {
+ return 0L;
+ }
+ long size = sizeOfObjectList(rows);
+ for (InsertRowNode row : rows) {
+ size += sizeOfInsertRowNode(row, deduplicatedObjects);
+ }
+ return size;
+ }
+
+ private static long sizeOfInsertTabletNodeList(
+ final List<InsertTabletNode> tablets, final Set<Object>
deduplicatedObjects) {
+ if (tablets == null) {
+ return 0L;
+ }
+ long size = sizeOfObjectList(tablets);
+ for (InsertTabletNode tablet : tablets) {
+ size += sizeOfInsertTabletNode(tablet, deduplicatedObjects);
+ }
+ return size;
+ }
+
+ private static long sizeOfObjectList(final List<?> list) {
+ if (list == null) {
+ return 0L;
+ }
+ long size = RamUsageEstimator.shallowSizeOf(list);
+ if (list instanceof ArrayList) {
+ size +=
+ RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * list.size());
+ }
+ return size;
+ }
+
+ private static long sizeOfIntegerList(final List<Integer> integers) {
+ if (integers == null) {
+ return 0L;
+ }
+ long size = sizeOfObjectList(integers);
+ for (Integer ignored : integers) {
+ size += SIZE_OF_INT;
+ }
+ return size;
+ }
+
+ private static long sizeOfResults(final Map<Integer, TSStatus> results) {
+ if (results == null) {
+ return 0L;
+ }
+ long size = RamUsageEstimator.shallowSizeOf(results);
+ for (Map.Entry<Integer, TSStatus> entry : results.entrySet()) {
+ size +=
+ SIZE_OF_INT
+ + sizeOfTSStatus(entry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ }
+ return size;
+ }
+
+ private static Set<Object> newDeduplicatedObjectSet() {
+ return Collections.newSetFromMap(new IdentityHashMap<>());
+ }
+
+ private static boolean shouldCountObject(
+ final Object object, final Set<Object> deduplicatedObjects) {
+ return object != null && (deduplicatedObjects == null ||
deduplicatedObjects.add(object));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java
new file mode 100644
index 00000000000..58b2756b16f
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class InsertNodeMemoryEstimatorTest {
+
+ @Test
+ public void testInsertRowsNodeLaterRowSizeIsEstimated() throws
IllegalPathException {
+ InsertRowNode firstRow =
+ createTextInsertRowNode("child-1", "root.sg.d1", new String[] {"s1"},
new String[] {"v1"});
+ InsertRowNode smallSecondRow =
+ createTextInsertRowNode("child-2", "root.sg.d2", new String[] {"s1"},
new String[] {"v2"});
+ InsertRowNode largeSecondRow =
+ createTextInsertRowNode(
+ "child-3",
+ "root.sg.device_with_a_longer_path_segment",
+ new String[] {"s1", "measurement_with_a_longer_name", "s3"},
+ new String[] {"v2", repeatedString("payload", 32),
repeatedString("payload", 48)});
+
+ long baselineSize =
+ InsertNodeMemoryEstimator.sizeOf(createInsertRowsNode("parent",
firstRow, smallSecondRow));
+ long largerNodeSize =
+ InsertNodeMemoryEstimator.sizeOf(createInsertRowsNode("parent",
firstRow, largeSecondRow));
+
+ Assert.assertTrue(largerNodeSize > baselineSize);
+ }
+
+ @Test
+ public void testInsertRowsNodeResultsAreEstimated() throws
IllegalPathException {
+ InsertRowsNode node =
+ createInsertRowsNode(
+ "parent",
+ createTextInsertRowNode(
+ "child-1", "root.sg.d1", new String[] {"s1"}, new String[]
{"v1"}),
+ createTextInsertRowNode(
+ "child-2", "root.sg.d2", new String[] {"s1"}, new String[]
{"v2"}));
+
+ long sizeWithoutResults = InsertNodeMemoryEstimator.sizeOf(node);
+
+ TSStatus statusWithoutSubStatus = createStatus("outer-message");
+ node.getResults().put(1, statusWithoutSubStatus);
+ long sizeWithResults = InsertNodeMemoryEstimator.sizeOf(node);
+
+ TSStatus statusWithSubStatus = createStatus("outer-message");
+ List<TSStatus> subStatusList = new ArrayList<>();
+ subStatusList.add(createStatus(repeatedString("inner-message", 16)));
+ statusWithSubStatus.setSubStatus(subStatusList);
+ node.getResults().put(1, statusWithSubStatus);
+ long sizeWithSubStatus = InsertNodeMemoryEstimator.sizeOf(node);
+
+ Assert.assertTrue(sizeWithResults > sizeWithoutResults);
+ Assert.assertTrue(sizeWithSubStatus > sizeWithResults);
+ }
+
+ @Test
+ public void testInsertRowsOfOneDeviceNodeLaterRowSizeIsEstimated() throws
IllegalPathException {
+ InsertRowNode firstRow =
+ createTextInsertRowNode(
+ "child-1", "root.sg.d1", new String[] {"s1", "s2"}, new String[]
{"v1", "v2"});
+ InsertRowNode smallSecondRow =
+ createTextInsertRowNode(
+ "child-2", "root.sg.d1", new String[] {"s1", "s2"}, new String[]
{"v3", "v4"});
+ InsertRowNode largeSecondRow =
+ createTextInsertRowNode(
+ "child-3",
+ "root.sg.d1",
+ new String[] {"s1", "s2"},
+ new String[] {repeatedString("payload", 32),
repeatedString("payload", 48)});
+
+ long baselineSize =
+ InsertNodeMemoryEstimator.sizeOf(
+ createInsertRowsOfOneDeviceNode("parent", firstRow,
smallSecondRow));
+ long largerNodeSize =
+ InsertNodeMemoryEstimator.sizeOf(
+ createInsertRowsOfOneDeviceNode("parent", firstRow,
largeSecondRow));
+
+ Assert.assertTrue(largerNodeSize > baselineSize);
+ }
+
+ @Test
+ public void testInsertMultiTabletsNodeLaterTabletSizeIsEstimated() throws
IllegalPathException {
+ InsertTabletNode firstTablet = createTextInsertTabletNode("child-1",
"root.sg.d1", 1, 1, 2);
+ InsertTabletNode smallSecondTablet =
+ createTextInsertTabletNode("child-2", "root.sg.d2", 1, 1, 2);
+ InsertTabletNode largeSecondTablet =
+ createTextInsertTabletNode("child-3", "root.sg.d3", 3, 8, 16);
+
+ long baselineSize =
+ InsertNodeMemoryEstimator.sizeOf(
+ createInsertMultiTabletsNode("parent", firstTablet,
smallSecondTablet));
+ long largerNodeSize =
+ InsertNodeMemoryEstimator.sizeOf(
+ createInsertMultiTabletsNode("parent", firstTablet,
largeSecondTablet));
+
+ Assert.assertTrue(largerNodeSize > baselineSize);
+ }
+
+ @Test
+ public void testPlanNodeIdIsEstimated() throws IllegalPathException {
+ InsertRowNode shortPlanNodeIdRow =
+ createTextInsertRowNode("id", "root.sg.d1", new String[] {"s1"}, new
String[] {"v1"});
+ InsertRowNode longPlanNodeIdRow =
+ createTextInsertRowNode(
+ repeatedString("plan-node-id", 12),
+ "root.sg.d1",
+ new String[] {"s1"},
+ new String[] {"v1"});
+
+ Assert.assertTrue(
+ InsertNodeMemoryEstimator.sizeOf(longPlanNodeIdRow)
+ > InsertNodeMemoryEstimator.sizeOf(shortPlanNodeIdRow));
+ }
+
+ private static InsertRowsNode createInsertRowsNode(
+ String planNodeId, InsertRowNode... insertRowNodes) {
+ InsertRowsNode node = new InsertRowsNode(new PlanNodeId(planNodeId));
+ for (int i = 0; i < insertRowNodes.length; i++) {
+ node.addOneInsertRowNode(insertRowNodes[i], i);
+ }
+ node.setDevicePath(insertRowNodes[0].getDevicePath());
+ node.setMeasurementSchemas(insertRowNodes[0].getMeasurementSchemas());
+ node.setMeasurements(insertRowNodes[0].getMeasurements());
+ node.setDataTypes(insertRowNodes[0].getDataTypes());
+ return node;
+ }
+
+ private static InsertRowsOfOneDeviceNode createInsertRowsOfOneDeviceNode(
+ String planNodeId, InsertRowNode... insertRowNodes) {
+ InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new
PlanNodeId(planNodeId));
+ List<InsertRowNode> rows = new ArrayList<>(Arrays.asList(insertRowNodes));
+ List<Integer> indexes = new ArrayList<>();
+ for (int i = 0; i < insertRowNodes.length; i++) {
+ indexes.add(i);
+ }
+ node.setInsertRowNodeList(rows);
+ node.setInsertRowNodeIndexList(indexes);
+ return node;
+ }
+
+ private static InsertMultiTabletsNode createInsertMultiTabletsNode(
+ String planNodeId, InsertTabletNode... insertTabletNodes) {
+ InsertMultiTabletsNode node = new InsertMultiTabletsNode(new
PlanNodeId(planNodeId));
+ for (int i = 0; i < insertTabletNodes.length; i++) {
+ node.addInsertTabletNode(insertTabletNodes[i], i);
+ }
+ return node;
+ }
+
+ private static InsertRowNode createTextInsertRowNode(
+ String planNodeId, String devicePath, String[] measurements, String[]
values)
+ throws IllegalPathException {
+ TSDataType[] dataTypes = new TSDataType[measurements.length];
+ MeasurementSchema[] measurementSchemas = new
MeasurementSchema[measurements.length];
+ Object[] rowValues = new Object[measurements.length];
+ for (int i = 0; i < measurements.length; i++) {
+ dataTypes[i] = TSDataType.TEXT;
+ measurementSchemas[i] = new MeasurementSchema(measurements[i],
TSDataType.TEXT);
+ rowValues[i] = new Binary(values[i], TSFileConfig.STRING_CHARSET);
+ }
+ return new InsertRowNode(
+ new PlanNodeId(planNodeId),
+ new PartialPath(devicePath),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ 1L,
+ rowValues,
+ false);
+ }
+
+ private static InsertTabletNode createTextInsertTabletNode(
+ String planNodeId, String devicePath, int measurementCount, int
rowCount, int repeatCount)
+ throws IllegalPathException {
+ String[] measurements = new String[measurementCount];
+ TSDataType[] dataTypes = new TSDataType[measurementCount];
+ MeasurementSchema[] measurementSchemas = new
MeasurementSchema[measurementCount];
+ Object[] columns = new Object[measurementCount];
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
measurementIndex++) {
+ measurements[measurementIndex] = "s" + measurementIndex;
+ dataTypes[measurementIndex] = TSDataType.TEXT;
+ measurementSchemas[measurementIndex] =
+ new MeasurementSchema(measurements[measurementIndex],
TSDataType.TEXT);
+ Binary[] values = new Binary[rowCount];
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+ values[rowIndex] =
+ new Binary(
+ repeatedString("value-" + measurementIndex + "-" + rowIndex,
repeatCount),
+ TSFileConfig.STRING_CHARSET);
+ }
+ columns[measurementIndex] = values;
+ }
+
+ long[] times = new long[rowCount];
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+ times[rowIndex] = rowIndex;
+ }
+
+ return new InsertTabletNode(
+ new PlanNodeId(planNodeId),
+ new PartialPath(devicePath),
+ false,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ times,
+ null,
+ columns,
+ rowCount);
+ }
+
+ private static TSStatus createStatus(String message) {
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ status.setMessage(message);
+ return status;
+ }
+
+ private static String repeatedString(String unit, int repeatCount) {
+ StringBuilder builder = new StringBuilder(unit.length() * repeatCount);
+ for (int i = 0; i < repeatCount; i++) {
+ builder.append(unit);
+ }
+ return builder.toString();
+ }
+}