This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new dce37ef8bd0 Pipe: Added memory control for receiver insert statements
(#15046) (#15047)
dce37ef8bd0 is described below
commit dce37ef8bd0bc52c424942eee615988885c9edc4
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 31 20:37:34 2025 +0800
Pipe: Added memory control for receiver insert statements (#15046) (#15047)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 33 +++
.../resource/memory/InsertNodeMemoryEstimator.java | 301 ++++-----------------
.../plan/statement/crud/InsertBaseStatement.java | 40 ++-
.../crud/InsertMultiTabletsStatement.java | 16 ++
.../plan/statement/crud/InsertRowStatement.java | 12 +
.../crud/InsertRowsOfOneDeviceStatement.java | 16 ++
.../plan/statement/crud/InsertRowsStatement.java | 17 ++
.../plan/statement/crud/InsertTabletStatement.java | 13 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 10 +
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../consensus/index/impl/HybridProgressIndex.java | 4 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 7 +
.../commons/schema/view/LogicalViewSchema.java | 13 +-
13 files changed, 235 insertions(+), 252 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 71799be00a8..542731ee37a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -63,6 +64,8 @@ import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -147,6 +150,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
private long lastSuccessfulLoginTime = Long.MIN_VALUE;
+ private static final double ACTUAL_TO_ESTIMATED_MEMORY_RATIO =
+ PipeConfig.getInstance().getPipeReceiverActualToEstimatedMemoryRatio();
+ private PipeMemoryBlock allocatedMemoryBlock;
+
static {
try {
folderManager =
@@ -647,7 +654,14 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TSStatus executeStatementAndClassifyExceptions(final Statement
statement) {
+ long estimatedMemory = 0L;
try {
+ if (statement instanceof InsertBaseStatement) {
+ estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
+ allocatedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocate((long) (estimatedMemory *
ACTUAL_TO_ESTIMATED_MEMORY_RATIO));
+ }
final TSStatus result =
executeStatementWithRetryOnDataTypeMismatch(statement);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -660,6 +674,20 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
result);
return statement.accept(STATEMENT_STATUS_VISITOR, result);
}
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ final String message =
+ String.format(
+ "Temporarily out of memory when executing statement %s,
Requested memory: %s, used memory: %s, total memory: %s",
+ statement,
+ estimatedMemory,
+ PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+ PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
+ }
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(message);
} catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Exception encountered while executing statement
{}: ",
@@ -667,6 +695,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
statement,
e);
return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
+ } finally {
+ if (Objects.nonNull(allocatedMemoryBlock)) {
+ allocatedMemoryBlock.close();
+ allocatedMemoryBlock = null;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
index d577ddd8f2b..1f9a21bfbd2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -25,13 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
-import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -43,20 +36,18 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.encoding.encoder.TSEncodingBuilder;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
-import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.Objects;
public class InsertNodeMemoryEstimator {
@@ -73,9 +64,6 @@ public class InsertNodeMemoryEstimator {
private static final long NUM_BYTES_OBJECT_HEADER =
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
private static final long NUM_BYTES_ARRAY_HEADER =
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
- private static final long REENTRANT_READ_WRITE_LOCK_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.class);
-
private static final long TS_ENCODING_PLAIN_BUILDER_SIZE =
RamUsageEstimator.shallowSizeOf(TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN));
@@ -104,9 +92,6 @@ public class InsertNodeMemoryEstimator {
private static final long MEASUREMENT_SCHEMA_SIZE =
RamUsageEstimator.shallowSizeOfInstance(MeasurementSchema.class);
- private static final long PLAIN_DEVICE_ID_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(PlainDeviceID.class);
-
// =============================Thrift==================================
private static final long T_REGION_REPLICA_SET_SIZE =
@@ -124,29 +109,6 @@ public class InsertNodeMemoryEstimator {
private static final long T_CONSENSUS_GROUP_ID_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TConsensusGroupId.class);
- //
=============================ProgressIndex==================================
-
- private static final long HYBRID_PROGRESS_INDEX_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class);
-
- private static final long IOT_PROGRESS_INDEX_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(IoTProgressIndex.class);
-
- private static final long META_PROGRESS_INDEX_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(MetaProgressIndex.class);
-
- private static final long RECOVER_PROGRESS_INDEX_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(RecoverProgressIndex.class);
-
- private static final long SIMPLE_PROGRESS_INDEX_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(SimpleProgressIndex.class);
-
- private static final long STATE_PROGRESS_INDEX_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(StateProgressIndex.class);
-
- private static final long TIME_WINDOW_STATE_PROGRESS_INDEX_SIZE =
-
RamUsageEstimator.shallowSizeOfInstance(TimeWindowStateProgressIndex.class);
-
// =============================BitMap==================================
private static final long BIT_MAP_SIZE =
RamUsageEstimator.shallowSizeOfInstance(BitMap.class);
@@ -163,8 +125,6 @@ public class InsertNodeMemoryEstimator {
RamUsageEstimator.alignObjectSize(Float.BYTES + NUM_BYTES_OBJECT_HEADER);
private static final long SIZE_OF_BOOLEAN =
RamUsageEstimator.alignObjectSize(1 + NUM_BYTES_OBJECT_HEADER);
- private static final long SIZE_OF_SHORT =
- RamUsageEstimator.alignObjectSize(Short.BYTES + NUM_BYTES_OBJECT_HEADER);
private static final long SIZE_OF_STRING =
RamUsageEstimator.shallowSizeOfInstance(String.class);
// The calculated result needs to be magnified by 1.3 times, which is 1.3
times different
@@ -204,7 +164,7 @@ public class InsertNodeMemoryEstimator {
// MeasurementSchemas
size += sizeOfMeasurementSchemas(node.getMeasurementSchemas());
// Measurement
- size += sizeOfMeasurement(node.getMeasurements());
+ size += sizeOfStringArray(node.getMeasurements());
// dataTypes
size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
// deviceID
@@ -237,7 +197,7 @@ public class InsertNodeMemoryEstimator {
private static long sizeOfInsertTabletNode(final InsertTabletNode node) {
long size = INSERT_TABLET_NODE_SIZE;
size += calculateFullInsertNodeSize(node);
- size += sizeOfTimes(node.getTimes());
+ size += RamUsageEstimator.sizeOf(node.getTimes());
size += sizeOfBitMapArray(node.getBitMaps());
size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
final List<Integer> range = node.getRange();
@@ -252,7 +212,7 @@ public class InsertNodeMemoryEstimator {
size += calculateInsertNodeSizeExcludingSchemas(node);
- size += sizeOfTimes(node.getTimes());
+ size += RamUsageEstimator.sizeOf(node.getTimes());
size += sizeOfBitMapArray(node.getBitMaps());
@@ -362,7 +322,7 @@ public class InsertNodeMemoryEstimator {
// ============================Device And
Measurement===================================
- private static long sizeOfPartialPath(final PartialPath partialPath) {
+ public static long sizeOfPartialPath(final PartialPath partialPath) {
if (partialPath == null) {
return 0L;
}
@@ -377,7 +337,7 @@ public class InsertNodeMemoryEstimator {
return size;
}
- private static long sizeOfMeasurementSchemas(final MeasurementSchema[]
measurementSchemas) {
+ public static long sizeOfMeasurementSchemas(final MeasurementSchema[]
measurementSchemas) {
if (measurementSchemas == null) {
return 0L;
}
@@ -397,15 +357,15 @@ public class InsertNodeMemoryEstimator {
// Header + primitive + reference
long size = MEASUREMENT_SCHEMA_SIZE;
// measurementId
- size += sizeOfString(measurementSchema.getMeasurementId());
+ size += RamUsageEstimator.sizeOf(measurementSchema.getMeasurementId());
// props
final Map<String, String> props = measurementSchema.getProps();
if (props != null) {
size += NUM_BYTES_OBJECT_HEADER;
for (Map.Entry<String, String> entry : props.entrySet()) {
size +=
- sizeOfString(entry.getKey())
- + sizeOfString(entry.getValue())
+ RamUsageEstimator.sizeOf(entry.getKey())
+ + RamUsageEstimator.sizeOf(entry.getValue())
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
}
}
@@ -413,30 +373,8 @@ public class InsertNodeMemoryEstimator {
return size;
}
- private static long sizeOfMeasurement(final String[] measurement) {
- if (measurement == null) {
- return 0L;
- }
- return sizeOfStringArray(measurement);
- }
-
private static long sizeOfIDeviceID(final IDeviceID deviceID) {
- if (deviceID == null) {
- return 0L;
- }
- return sizeOfPlainDeviceID((PlainDeviceID) deviceID);
- }
-
- private static long sizeOfPlainDeviceID(final PlainDeviceID deviceID) {
- long size = PLAIN_DEVICE_ID_SIZE;
- final String id = deviceID.toString();
-
- if (id != null) {
- // Estimate the sum of the table and segment lengths to be the size of
the id
- size += sizeOfString(id) * 2;
- }
-
- return size;
+ return Objects.nonNull(deviceID) ? deviceID.ramBytesUsed() : 0L;
}
// =============================Thrift==================================
@@ -487,7 +425,7 @@ public class InsertNodeMemoryEstimator {
// objectHeader + ip + port
long size = T_END_POINT_SIZE;
- size += sizeOfString(tEndPoint.ip);
+ size += RamUsageEstimator.sizeOf(tEndPoint.ip);
return size;
}
@@ -498,7 +436,7 @@ public class InsertNodeMemoryEstimator {
long size = TS_STATUS_SIZE;
// message
if (tSStatus.isSetMessage()) {
- size += sizeOfString(tSStatus.message);
+ size += RamUsageEstimator.sizeOf(tSStatus.message);
}
// ignore subStatus
// redirectNode
@@ -511,174 +449,20 @@ public class InsertNodeMemoryEstimator {
//
=============================ProgressIndex==================================
private static long sizeOfProgressIndex(final ProgressIndex progressIndex) {
- if (progressIndex == null) {
- return 0L;
- }
- switch (progressIndex.getType()) {
- case HYBRID_PROGRESS_INDEX:
- return sizeOfHybridProgressIndex((HybridProgressIndex) progressIndex);
- case IOT_PROGRESS_INDEX:
- return sizeOfIoTProgressIndex((IoTProgressIndex) progressIndex);
- case META_PROGRESS_INDEX:
- return sizeOfMetaProgressIndex();
- case STATE_PROGRESS_INDEX:
- return sizeOfStateProgressIndex((StateProgressIndex) progressIndex);
- case SIMPLE_PROGRESS_INDEX:
- return sizeOfSimpleProgressIndex();
- case MINIMUM_PROGRESS_INDEX:
- return 0L;
- case RECOVER_PROGRESS_INDEX:
- return sizeOfRecoverProgressIndex((RecoverProgressIndex)
progressIndex);
- case TIME_WINDOW_STATE_PROGRESS_INDEX:
- return
sizeOfTimeWindowStateProgressIndex((TimeWindowStateProgressIndex)
progressIndex);
- }
- return 0L;
- }
-
- private static long sizeOfHybridProgressIndex(final HybridProgressIndex
progressIndex) {
- // Memory alignment of basic types and reference types in structures
- long size = HYBRID_PROGRESS_INDEX_SIZE;
-
- // Memory calculation in reference type, cannot get exact value, roughly
estimate
- size += REENTRANT_READ_WRITE_LOCK_SIZE;
- if (progressIndex.getType2Index() != null) {
- // ignore ProgressIndex
- size +=
- NUM_BYTES_OBJECT_HEADER
- + progressIndex.getType2Index().size()
- * (SIZE_OF_SHORT +
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
- }
- return size;
- }
-
- private static long sizeOfIoTProgressIndex(IoTProgressIndex progressIndex) {
- // Memory alignment of basic types and reference types in structures
- long size = IOT_PROGRESS_INDEX_SIZE;
-
- // Memory calculation in reference type, cannot get exact value, roughly
estimate
- size += REENTRANT_READ_WRITE_LOCK_SIZE;
-
- size +=
- NUM_BYTES_OBJECT_HEADER
- + progressIndex.getPeerId2SearchIndexSize()
- * (SIZE_OF_INT + SIZE_OF_LONG +
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
-
- return size;
- }
-
- private static long sizeOfMetaProgressIndex() {
- // Memory alignment of basic types and reference types in structures
- return META_PROGRESS_INDEX_SIZE + REENTRANT_READ_WRITE_LOCK_SIZE;
- }
-
- private static long sizeOfRecoverProgressIndex(RecoverProgressIndex
progressIndex) {
- // Memory alignment of basic types and reference types in structures
- long size = RECOVER_PROGRESS_INDEX_SIZE;
-
- // Memory calculation in reference type, cannot get exact value, roughly
estimate
- size += REENTRANT_READ_WRITE_LOCK_SIZE;
- if (progressIndex.getDataNodeId2LocalIndex() != null) {
- size +=
- NUM_BYTES_OBJECT_HEADER
- + progressIndex.getDataNodeId2LocalIndex().size()
- * (SIZE_OF_INT + SIZE_OF_LONG +
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
- }
- return size;
- }
-
- private static long sizeOfSimpleProgressIndex() {
- // Memory alignment of basic types and reference types in structures
- return SIMPLE_PROGRESS_INDEX_SIZE;
- }
-
- private static long sizeOfStateProgressIndex(StateProgressIndex
progressIndex) {
- // Memory alignment of basic types and reference types in structures
- long size = STATE_PROGRESS_INDEX_SIZE;
-
- // Memory calculation in reference type, cannot get exact value, roughly
estimate
- size += REENTRANT_READ_WRITE_LOCK_SIZE;
- if (progressIndex.getState() != null) {
- size += NUM_BYTES_OBJECT_HEADER;
- for (Map.Entry<String, Binary> entry :
progressIndex.getState().entrySet()) {
- size +=
- RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
- + sizeOfString(entry.getKey())
- + sizeOfBinary(entry.getValue());
- }
- }
- return size;
- }
-
- private static long sizeOfTimeWindowStateProgressIndex(
- TimeWindowStateProgressIndex progressIndex) {
- // Memory alignment of basic types and reference types in structures
- long size = TIME_WINDOW_STATE_PROGRESS_INDEX_SIZE;
-
- // Memory calculation in reference type, cannot get exact value, roughly
estimate
- size += REENTRANT_READ_WRITE_LOCK_SIZE;
- if (progressIndex.getTimeSeries2TimestampWindowBufferPairMap() != null) {
- size += NUM_BYTES_OBJECT_HEADER;
- for (Map.Entry<String, Pair<Long, ByteBuffer>> entry :
-
progressIndex.getTimeSeries2TimestampWindowBufferPairMap().entrySet()) {
- size +=
- sizeOfString(entry.getKey())
- + SIZE_OF_LONG * 2
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
- }
- }
- return size;
+ return Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0L;
}
// =============================Write==================================
- private static long sizeOfBinary(Binary binary) {
- if (binary == null) {
- return 0;
- }
- // -----header----
- // -----ref-------
- // ---------------
- // --arrayHeader--
- // ----values-----
- return RamUsageEstimator.alignObjectSize(NUM_BYTES_OBJECT_HEADER +
NUM_BYTES_OBJECT_REF)
- + RamUsageEstimator.alignObjectSize(NUM_BYTES_ARRAY_HEADER +
binary.getValues().length);
- }
-
- private static long sizeOfString(String value) {
- if (value == null) {
- return 0;
- }
- // -----header----
- // -----ref-------
- // ---------------
- // --arrayHeader--
- // ----values-----
- return SIZE_OF_STRING
- + RamUsageEstimator.alignObjectSize(NUM_BYTES_ARRAY_HEADER +
value.length());
- }
-
- private static long sizeOfStringArray(final String[] values) {
- if (values == null) {
- return 0;
- }
- long size =
- RamUsageEstimator.alignObjectSize(
- NUM_BYTES_ARRAY_HEADER + values.length * NUM_BYTES_OBJECT_REF);
- for (String value : values) {
- size += sizeOfString(value);
- }
- return size;
+ private static long sizeOfBinary(final Binary binary) {
+ return Objects.nonNull(binary) ? binary.ramBytesUsed() : 0L;
}
- private static long sizeOfTimes(final long[] times) {
- if (times == null) {
- return 0;
- }
- long size = NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * times.length;
- return RamUsageEstimator.alignObjectSize(size);
+ public static long sizeOfStringArray(final String[] values) {
+ return Objects.nonNull(values) ? RamUsageEstimator.sizeOf(values) : 0L;
}
- private static long sizeOfBitMapArray(BitMap[] bitMaps) {
+ public static long sizeOfBitMapArray(BitMap[] bitMaps) {
if (bitMaps == null) {
return 0L;
}
@@ -702,8 +486,15 @@ public class InsertNodeMemoryEstimator {
return size;
}
- private static long sizeOfColumns(
+ public static long sizeOfColumns(
final Object[] columns, final MeasurementSchema[] measurementSchemas) {
+ // Directly calculate if measurementSchemas are absent
+ if (Objects.isNull(measurementSchemas)) {
+ return RamUsageEstimator.shallowSizeOf(columns)
+ + Arrays.stream(columns)
+ .mapToLong(InsertNodeMemoryEstimator::getNumBytesUnknownObject)
+ .reduce(0L, Long::sum);
+ }
long size =
RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length);
@@ -743,13 +534,7 @@ public class InsertNodeMemoryEstimator {
case TEXT:
case BLOB:
{
- final Binary[] values = (Binary[]) columns[i];
- size +=
- RamUsageEstimator.alignObjectSize(
- NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF *
values.length);
- for (Binary value : values) {
- size += sizeOfBinary(value);
- }
+ size += getBinarySize((Binary[]) columns[i]);
break;
}
}
@@ -757,12 +542,32 @@ public class InsertNodeMemoryEstimator {
return size;
}
- private static long sizeOfValues(
- final Object[] columns, final MeasurementSchema[] measurementSchemas) {
+ private static long getNumBytesUnknownObject(final Object obj) {
+ return obj instanceof Binary[]
+ ? getBinarySize((Binary[]) obj)
+ : RamUsageEstimator.sizeOfObject(obj);
+ }
+
+ private static long getBinarySize(final Binary[] binaries) {
+ return RamUsageEstimator.shallowSizeOf(binaries)
+ + Arrays.stream(binaries)
+ .mapToLong(InsertNodeMemoryEstimator::sizeOfBinary)
+ .reduce(0L, Long::sum);
+ }
+
+ public static long sizeOfValues(
+ final Object[] values, final MeasurementSchema[] measurementSchemas) {
+ // Directly calculate if measurementSchemas are absent
+ if (Objects.isNull(measurementSchemas)) {
+ return RamUsageEstimator.shallowSizeOf(values)
+ + Arrays.stream(values)
+ .mapToLong(InsertNodeMemoryEstimator::getNumBytesUnknownObject)
+ .reduce(0L, Long::sum);
+ }
long size =
RamUsageEstimator.alignObjectSize(
- NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length);
- for (int i = 0; i < columns.length; i++) {
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * values.length);
+ for (int i = 0; i < values.length; i++) {
switch (measurementSchemas[i].getType()) {
case INT64:
case TIMESTAMP:
@@ -795,7 +600,7 @@ public class InsertNodeMemoryEstimator {
case TEXT:
case BLOB:
{
- final Binary binary = (Binary) columns[i];
+ final Binary binary = (Binary) values[i];
size += sizeOfBinary(binary);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 6d94f12cc35..b6c4d6a4382 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -30,13 +30,16 @@ import
org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
@@ -46,10 +49,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-public abstract class InsertBaseStatement extends Statement {
+public abstract class InsertBaseStatement extends Statement implements
Accountable {
/**
* if use id table, this filed is id form of device path <br>
@@ -87,6 +91,8 @@ public abstract class InsertBaseStatement extends Statement {
/** it is the end of current range. */
protected int recordedEndOfLogicalViewSchemaList = 0;
+ protected long ramBytesUsed = Long.MIN_VALUE;
+
// endregion
public PartialPath getDevicePath() {
@@ -404,5 +410,37 @@ public abstract class InsertBaseStatement extends
Statement {
}
}
}
+
// endregion
+ @Override
+ public long ramBytesUsed() {
+ if (ramBytesUsed > 0) {
+ return ramBytesUsed;
+ }
+ ramBytesUsed =
+ InsertNodeMemoryEstimator.sizeOfPartialPath(devicePath)
+ +
InsertNodeMemoryEstimator.sizeOfMeasurementSchemas(measurementSchemas)
+ + InsertNodeMemoryEstimator.sizeOfStringArray(measurements)
+ + RamUsageEstimator.shallowSizeOf(dataTypes)
+ + shallowSizeOfList(logicalViewSchemaList)
+ + (Objects.nonNull(logicalViewSchemaList)
+ ? logicalViewSchemaList.stream()
+ .mapToLong(LogicalViewSchema::ramBytesUsed)
+ .reduce(0L, Long::sum)
+ : 0L)
+ + shallowSizeOfList(indexOfSourcePathsOfLogicalViews)
+ + calculateBytesUsed();
+ return ramBytesUsed;
+ }
+
+ private long shallowSizeOfList(List<?> list) {
+ return Objects.nonNull(list)
+ ? InsertRowsStatement.LIST_SIZE
+ + RamUsageEstimator.alignObjectSize(
+ RamUsageEstimator.NUM_BYTES_ARRAY_HEADER
+ + (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF *
list.size())
+ : 0L;
+ }
+
+ protected abstract long calculateBytesUsed();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index bfc4e6406c6..a2b9c63e1e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -27,13 +27,18 @@ import
org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
public class InsertMultiTabletsStatement extends InsertBaseStatement {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(InsertMultiTabletsStatement.class);
+
/** The {@link InsertTabletStatement} list */
List<InsertTabletStatement> insertTabletStatementList;
@@ -144,4 +149,15 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
splitResult.setInsertTabletStatementList(mergedList);
return splitResult;
}
+
+ @Override
+ protected long calculateBytesUsed() {
+ return INSTANCE_SIZE
+ + (Objects.nonNull(insertTabletStatementList)
+ ? InsertRowsStatement.LIST_SIZE
+ + insertTabletStatementList.stream()
+ .mapToLong(InsertTabletStatement::calculateBytesUsed)
+ .reduce(0L, Long::sum)
+ : 0);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 56835e45adb..3422fd7311f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
@@ -41,6 +42,7 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -59,6 +61,9 @@ public class InsertRowStatement extends InsertBaseStatement
implements ISchemaVa
private static final Logger LOGGER =
LoggerFactory.getLogger(InsertRowStatement.class);
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertRowStatement.class);
+
protected static final byte TYPE_RAW_STRING = -1;
protected static final byte TYPE_NULL = -2;
@@ -464,4 +469,11 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
return new Pair<>(
this.recordedBeginOfLogicalViewSchemaList,
this.recordedEndOfLogicalViewSchemaList);
}
+
+ @Override
+ protected long calculateBytesUsed() {
+ return INSTANCE_SIZE
+ + InsertNodeMemoryEstimator.sizeOfValues(values, measurementSchemas)
+ + RamUsageEstimator.sizeOf(measurementIsAligned);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index 55e7a57beb0..5013a13915c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -30,15 +30,20 @@ import
org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(InsertRowsOfOneDeviceStatement.class);
+
public InsertRowsOfOneDeviceStatement() {
super();
statementType = StatementType.BATCH_INSERT_ONE_DEVICE;
@@ -167,4 +172,15 @@ public class InsertRowsOfOneDeviceStatement extends
InsertBaseStatement {
}
return this;
}
+
+ @Override
+ protected long calculateBytesUsed() {
+ return INSTANCE_SIZE
+ + (Objects.nonNull(insertRowStatementList)
+ ? InsertRowsStatement.LIST_SIZE
+ + insertRowStatementList.stream()
+ .mapToLong(InsertRowStatement::calculateBytesUsed)
+ .reduce(0L, Long::sum)
+ : 0);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
index 5dbdba2f228..41ad8516db7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
@@ -28,13 +28,19 @@ import
org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
public class InsertRowsStatement extends InsertBaseStatement {
+ public static final long LIST_SIZE =
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertRowsStatement.class);
+
/** the InsertRowsStatement list */
private List<InsertRowStatement> insertRowStatementList;
@@ -162,4 +168,15 @@ public class InsertRowsStatement extends
InsertBaseStatement {
splitResult.setInsertRowStatementList(mergedList);
return splitResult;
}
+
+ @Override
+ protected long calculateBytesUsed() {
+ return INSTANCE_SIZE
+ + (Objects.nonNull(insertRowStatementList)
+ ? LIST_SIZE
+ + insertRowStatementList.stream()
+ .mapToLong(InsertRowStatement::calculateBytesUsed)
+ .reduce(0L, Long::sum)
+ : 0);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 28146562b03..99ed39caa44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -38,6 +39,7 @@ import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -51,8 +53,9 @@ import java.util.List;
import java.util.Map;
public class InsertTabletStatement extends InsertBaseStatement implements
ISchemaValidation {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(InsertTabletStatement.class);
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertTabletStatement.class);
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not
supported.";
@@ -412,4 +415,12 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
return new Pair<>(
this.recordedBeginOfLogicalViewSchemaList,
this.recordedEndOfLogicalViewSchemaList);
}
+
+ @Override
+ protected long calculateBytesUsed() {
+ return INSTANCE_SIZE
+ + RamUsageEstimator.sizeOf(times)
+ + InsertNodeMemoryEstimator.sizeOfBitMapArray(bitMaps)
+ + InsertNodeMemoryEstimator.sizeOfColumns(columns, measurementSchemas);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index dd15c21a48c..6a78fce213a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -261,6 +261,7 @@ public class CommonConfig {
private int pipeAirGapReceiverPort = 9780;
private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
+ private double pipeReceiverActualToEstimatedMemoryRatio = 3;
private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
@@ -1082,6 +1083,15 @@ public class CommonConfig {
return pipeReceiverLoginPeriodicVerificationIntervalMs;
}
+ public void setPipeReceiverActualToEstimatedMemoryRatio(
+ double pipeReceiverActualToEstimatedMemoryRatio) {
+ this.pipeReceiverActualToEstimatedMemoryRatio =
pipeReceiverActualToEstimatedMemoryRatio;
+ }
+
+ public double getPipeReceiverActualToEstimatedMemoryRatio() {
+ return pipeReceiverActualToEstimatedMemoryRatio;
+ }
+
public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
return pipeMaxAllowedHistoricalTsFilePerDataRegion;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d7d8ba068f4..4634b61e74d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -543,6 +543,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_receiver_login_periodic_verification_interval_ms",
Long.toString(config.getPipeReceiverLoginPeriodicVerificationIntervalMs()))));
+ config.setPipeReceiverActualToEstimatedMemoryRatio(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_receiver_actual_to_estimated_memory_ratio",
+
Double.toString(config.getPipeReceiverActualToEstimatedMemoryRatio()))));
config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index b27c2666cc9..2c8895532dc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -42,7 +42,9 @@ import java.util.stream.Collectors;
public class HybridProgressIndex extends ProgressIndex {
private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class) +
ProgressIndex.LOCK_SIZE;
+ RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class)
+ + RamUsageEstimator.shallowSizeOfInstance(HashMap.class)
+ + ProgressIndex.LOCK_SIZE;
private static final long ENTRY_SIZE =
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ RamUsageEstimator.alignObjectSize(Short.BYTES);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 24d49da94c1..677ca016eee 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -254,6 +254,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeReceiverLoginPeriodicVerificationIntervalMs();
}
+ public double getPipeReceiverActualToEstimatedMemoryRatio() {
+ return COMMON_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
+ }
+
/////////////////////////////// Hybrid Mode ///////////////////////////////
public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
@@ -494,6 +498,9 @@ public class PipeConfig {
LOGGER.info(
"PipeReceiverLoginPeriodicVerificationIntervalMs: {}",
getPipeReceiverLoginPeriodicVerificationIntervalMs());
+ LOGGER.info(
+ "PipeReceiverActualToEstimatedMemoryRatio: {}",
+ getPipeReceiverActualToEstimatedMemoryRatio());
LOGGER.info(
"PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
index c54bdfb8475..7121c979af2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
@@ -29,6 +29,8 @@ import org.apache.tsfile.encoding.encoder.Encoder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchemaType;
@@ -43,7 +45,10 @@ import java.util.List;
import java.util.Map;
public class LogicalViewSchema
- implements IMeasurementSchema, Comparable<LogicalViewSchema>, Serializable
{
+ implements IMeasurementSchema, Comparable<LogicalViewSchema>,
Serializable, Accountable {
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(LogicalViewSchema.class);
private String measurementId;
@@ -253,4 +258,10 @@ public class LogicalViewSchema
}
return null;
}
+
+ @Override
+ public long ramBytesUsed() {
+ // Roughly estimate the expression size
+ return INSTANCE_SIZE + RamUsageEstimator.sizeOf(measurementId) + 256;
+ }
}