This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new dce37ef8bd0 Pipe: Added memory control for receiver insert statements 
(#15046) (#15047)
dce37ef8bd0 is described below

commit dce37ef8bd0bc52c424942eee615988885c9edc4
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 31 20:37:34 2025 +0800

    Pipe: Added memory control for receiver insert statements (#15046) (#15047)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  33 +++
 .../resource/memory/InsertNodeMemoryEstimator.java | 301 ++++-----------------
 .../plan/statement/crud/InsertBaseStatement.java   |  40 ++-
 .../crud/InsertMultiTabletsStatement.java          |  16 ++
 .../plan/statement/crud/InsertRowStatement.java    |  12 +
 .../crud/InsertRowsOfOneDeviceStatement.java       |  16 ++
 .../plan/statement/crud/InsertRowsStatement.java   |  17 ++
 .../plan/statement/crud/InsertTabletStatement.java |  13 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  10 +
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../consensus/index/impl/HybridProgressIndex.java  |   4 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |   7 +
 .../commons/schema/view/LogicalViewSchema.java     |  13 +-
 13 files changed, 235 insertions(+), 252 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 71799be00a8..542731ee37a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -63,6 +64,8 @@ import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -147,6 +150,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       
PipeConfig.getInstance().getPipeReceiverLoginPeriodicVerificationIntervalMs();
   private long lastSuccessfulLoginTime = Long.MIN_VALUE;
 
+  private static final double ACTUAL_TO_ESTIMATED_MEMORY_RATIO =
+      PipeConfig.getInstance().getPipeReceiverActualToEstimatedMemoryRatio();
+  private PipeMemoryBlock allocatedMemoryBlock;
+
   static {
     try {
       folderManager =
@@ -647,7 +654,14 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
+    long estimatedMemory = 0L;
     try {
+      if (statement instanceof InsertBaseStatement) {
+        estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
+        allocatedMemoryBlock =
+            PipeDataNodeResourceManager.memory()
+                .forceAllocate((long) (estimatedMemory * 
ACTUAL_TO_ESTIMATED_MEMORY_RATIO));
+      }
       final TSStatus result = 
executeStatementWithRetryOnDataTypeMismatch(statement);
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
           || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -660,6 +674,20 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             result);
         return statement.accept(STATEMENT_STATUS_VISITOR, result);
       }
+    } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+      final String message =
+          String.format(
+              "Temporarily out of memory when executing statement %s, 
Requested memory: %s, used memory: %s, total memory: %s",
+              statement,
+              estimatedMemory,
+              PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+              PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
+      }
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+          .setMessage(message);
     } catch (final Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Exception encountered while executing statement 
{}: ",
@@ -667,6 +695,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           statement,
           e);
       return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
+    } finally {
+      if (Objects.nonNull(allocatedMemoryBlock)) {
+        allocatedMemoryBlock.close();
+        allocatedMemoryBlock = null;
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
index d577ddd8f2b..1f9a21bfbd2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -25,13 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
-import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
-import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -43,20 +36,18 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.encoding.encoder.TSEncodingBuilder;
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
-import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.Objects;
 
 public class InsertNodeMemoryEstimator {
 
@@ -73,9 +64,6 @@ public class InsertNodeMemoryEstimator {
   private static final long NUM_BYTES_OBJECT_HEADER = 
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
   private static final long NUM_BYTES_ARRAY_HEADER = 
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
 
-  private static final long REENTRANT_READ_WRITE_LOCK_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.class);
-
   private static final long TS_ENCODING_PLAIN_BUILDER_SIZE =
       
RamUsageEstimator.shallowSizeOf(TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN));
 
@@ -104,9 +92,6 @@ public class InsertNodeMemoryEstimator {
   private static final long MEASUREMENT_SCHEMA_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(MeasurementSchema.class);
 
-  private static final long PLAIN_DEVICE_ID_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(PlainDeviceID.class);
-
   // =============================Thrift==================================
 
   private static final long T_REGION_REPLICA_SET_SIZE =
@@ -124,29 +109,6 @@ public class InsertNodeMemoryEstimator {
   private static final long T_CONSENSUS_GROUP_ID_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(TConsensusGroupId.class);
 
-  // 
=============================ProgressIndex==================================
-
-  private static final long HYBRID_PROGRESS_INDEX_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class);
-
-  private static final long IOT_PROGRESS_INDEX_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(IoTProgressIndex.class);
-
-  private static final long META_PROGRESS_INDEX_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(MetaProgressIndex.class);
-
-  private static final long RECOVER_PROGRESS_INDEX_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(RecoverProgressIndex.class);
-
-  private static final long SIMPLE_PROGRESS_INDEX_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(SimpleProgressIndex.class);
-
-  private static final long STATE_PROGRESS_INDEX_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(StateProgressIndex.class);
-
-  private static final long TIME_WINDOW_STATE_PROGRESS_INDEX_SIZE =
-      
RamUsageEstimator.shallowSizeOfInstance(TimeWindowStateProgressIndex.class);
-
   // =============================BitMap==================================
 
   private static final long BIT_MAP_SIZE = 
RamUsageEstimator.shallowSizeOfInstance(BitMap.class);
@@ -163,8 +125,6 @@ public class InsertNodeMemoryEstimator {
       RamUsageEstimator.alignObjectSize(Float.BYTES + NUM_BYTES_OBJECT_HEADER);
   private static final long SIZE_OF_BOOLEAN =
       RamUsageEstimator.alignObjectSize(1 + NUM_BYTES_OBJECT_HEADER);
-  private static final long SIZE_OF_SHORT =
-      RamUsageEstimator.alignObjectSize(Short.BYTES + NUM_BYTES_OBJECT_HEADER);
   private static final long SIZE_OF_STRING = 
RamUsageEstimator.shallowSizeOfInstance(String.class);
 
   // The calculated result needs to be magnified by 1.3 times, which is 1.3 
times different
@@ -204,7 +164,7 @@ public class InsertNodeMemoryEstimator {
     // MeasurementSchemas
     size += sizeOfMeasurementSchemas(node.getMeasurementSchemas());
     // Measurement
-    size += sizeOfMeasurement(node.getMeasurements());
+    size += sizeOfStringArray(node.getMeasurements());
     // dataTypes
     size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
     // deviceID
@@ -237,7 +197,7 @@ public class InsertNodeMemoryEstimator {
   private static long sizeOfInsertTabletNode(final InsertTabletNode node) {
     long size = INSERT_TABLET_NODE_SIZE;
     size += calculateFullInsertNodeSize(node);
-    size += sizeOfTimes(node.getTimes());
+    size += RamUsageEstimator.sizeOf(node.getTimes());
     size += sizeOfBitMapArray(node.getBitMaps());
     size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
     final List<Integer> range = node.getRange();
@@ -252,7 +212,7 @@ public class InsertNodeMemoryEstimator {
 
     size += calculateInsertNodeSizeExcludingSchemas(node);
 
-    size += sizeOfTimes(node.getTimes());
+    size += RamUsageEstimator.sizeOf(node.getTimes());
 
     size += sizeOfBitMapArray(node.getBitMaps());
 
@@ -362,7 +322,7 @@ public class InsertNodeMemoryEstimator {
 
   // ============================Device And 
Measurement===================================
 
-  private static long sizeOfPartialPath(final PartialPath partialPath) {
+  public static long sizeOfPartialPath(final PartialPath partialPath) {
     if (partialPath == null) {
       return 0L;
     }
@@ -377,7 +337,7 @@ public class InsertNodeMemoryEstimator {
     return size;
   }
 
-  private static long sizeOfMeasurementSchemas(final MeasurementSchema[] 
measurementSchemas) {
+  public static long sizeOfMeasurementSchemas(final MeasurementSchema[] 
measurementSchemas) {
     if (measurementSchemas == null) {
       return 0L;
     }
@@ -397,15 +357,15 @@ public class InsertNodeMemoryEstimator {
     // Header + primitive + reference
     long size = MEASUREMENT_SCHEMA_SIZE;
     // measurementId
-    size += sizeOfString(measurementSchema.getMeasurementId());
+    size += RamUsageEstimator.sizeOf(measurementSchema.getMeasurementId());
     // props
     final Map<String, String> props = measurementSchema.getProps();
     if (props != null) {
       size += NUM_BYTES_OBJECT_HEADER;
       for (Map.Entry<String, String> entry : props.entrySet()) {
         size +=
-            sizeOfString(entry.getKey())
-                + sizeOfString(entry.getValue())
+            RamUsageEstimator.sizeOf(entry.getKey())
+                + RamUsageEstimator.sizeOf(entry.getValue())
                 + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
       }
     }
@@ -413,30 +373,8 @@ public class InsertNodeMemoryEstimator {
     return size;
   }
 
-  private static long sizeOfMeasurement(final String[] measurement) {
-    if (measurement == null) {
-      return 0L;
-    }
-    return sizeOfStringArray(measurement);
-  }
-
   private static long sizeOfIDeviceID(final IDeviceID deviceID) {
-    if (deviceID == null) {
-      return 0L;
-    }
-    return sizeOfPlainDeviceID((PlainDeviceID) deviceID);
-  }
-
-  private static long sizeOfPlainDeviceID(final PlainDeviceID deviceID) {
-    long size = PLAIN_DEVICE_ID_SIZE;
-    final String id = deviceID.toString();
-
-    if (id != null) {
-      // Estimate the sum of the table and segment lengths to be the size of 
the id
-      size += sizeOfString(id) * 2;
-    }
-
-    return size;
+    return Objects.nonNull(deviceID) ? deviceID.ramBytesUsed() : 0L;
   }
 
   // =============================Thrift==================================
@@ -487,7 +425,7 @@ public class InsertNodeMemoryEstimator {
     // objectHeader + ip + port
     long size = T_END_POINT_SIZE;
 
-    size += sizeOfString(tEndPoint.ip);
+    size += RamUsageEstimator.sizeOf(tEndPoint.ip);
     return size;
   }
 
@@ -498,7 +436,7 @@ public class InsertNodeMemoryEstimator {
     long size = TS_STATUS_SIZE;
     // message
     if (tSStatus.isSetMessage()) {
-      size += sizeOfString(tSStatus.message);
+      size += RamUsageEstimator.sizeOf(tSStatus.message);
     }
     // ignore subStatus
     // redirectNode
@@ -511,174 +449,20 @@ public class InsertNodeMemoryEstimator {
   // 
=============================ProgressIndex==================================
 
   private static long sizeOfProgressIndex(final ProgressIndex progressIndex) {
-    if (progressIndex == null) {
-      return 0L;
-    }
-    switch (progressIndex.getType()) {
-      case HYBRID_PROGRESS_INDEX:
-        return sizeOfHybridProgressIndex((HybridProgressIndex) progressIndex);
-      case IOT_PROGRESS_INDEX:
-        return sizeOfIoTProgressIndex((IoTProgressIndex) progressIndex);
-      case META_PROGRESS_INDEX:
-        return sizeOfMetaProgressIndex();
-      case STATE_PROGRESS_INDEX:
-        return sizeOfStateProgressIndex((StateProgressIndex) progressIndex);
-      case SIMPLE_PROGRESS_INDEX:
-        return sizeOfSimpleProgressIndex();
-      case MINIMUM_PROGRESS_INDEX:
-        return 0L;
-      case RECOVER_PROGRESS_INDEX:
-        return sizeOfRecoverProgressIndex((RecoverProgressIndex) 
progressIndex);
-      case TIME_WINDOW_STATE_PROGRESS_INDEX:
-        return 
sizeOfTimeWindowStateProgressIndex((TimeWindowStateProgressIndex) 
progressIndex);
-    }
-    return 0L;
-  }
-
-  private static long sizeOfHybridProgressIndex(final HybridProgressIndex 
progressIndex) {
-    // Memory alignment of basic types and reference types in structures
-    long size = HYBRID_PROGRESS_INDEX_SIZE;
-
-    // Memory calculation in reference type, cannot get exact value, roughly 
estimate
-    size += REENTRANT_READ_WRITE_LOCK_SIZE;
-    if (progressIndex.getType2Index() != null) {
-      // ignore ProgressIndex
-      size +=
-          NUM_BYTES_OBJECT_HEADER
-              + progressIndex.getType2Index().size()
-                  * (SIZE_OF_SHORT + 
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
-    }
-    return size;
-  }
-
-  private static long sizeOfIoTProgressIndex(IoTProgressIndex progressIndex) {
-    // Memory alignment of basic types and reference types in structures
-    long size = IOT_PROGRESS_INDEX_SIZE;
-
-    // Memory calculation in reference type, cannot get exact value, roughly 
estimate
-    size += REENTRANT_READ_WRITE_LOCK_SIZE;
-
-    size +=
-        NUM_BYTES_OBJECT_HEADER
-            + progressIndex.getPeerId2SearchIndexSize()
-                * (SIZE_OF_INT + SIZE_OF_LONG + 
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
-
-    return size;
-  }
-
-  private static long sizeOfMetaProgressIndex() {
-    // Memory alignment of basic types and reference types in structures
-    return META_PROGRESS_INDEX_SIZE + REENTRANT_READ_WRITE_LOCK_SIZE;
-  }
-
-  private static long sizeOfRecoverProgressIndex(RecoverProgressIndex 
progressIndex) {
-    // Memory alignment of basic types and reference types in structures
-    long size = RECOVER_PROGRESS_INDEX_SIZE;
-
-    // Memory calculation in reference type, cannot get exact value, roughly 
estimate
-    size += REENTRANT_READ_WRITE_LOCK_SIZE;
-    if (progressIndex.getDataNodeId2LocalIndex() != null) {
-      size +=
-          NUM_BYTES_OBJECT_HEADER
-              + progressIndex.getDataNodeId2LocalIndex().size()
-                  * (SIZE_OF_INT + SIZE_OF_LONG + 
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
-    }
-    return size;
-  }
-
-  private static long sizeOfSimpleProgressIndex() {
-    // Memory alignment of basic types and reference types in structures
-    return SIMPLE_PROGRESS_INDEX_SIZE;
-  }
-
-  private static long sizeOfStateProgressIndex(StateProgressIndex 
progressIndex) {
-    // Memory alignment of basic types and reference types in structures
-    long size = STATE_PROGRESS_INDEX_SIZE;
-
-    // Memory calculation in reference type, cannot get exact value, roughly 
estimate
-    size += REENTRANT_READ_WRITE_LOCK_SIZE;
-    if (progressIndex.getState() != null) {
-      size += NUM_BYTES_OBJECT_HEADER;
-      for (Map.Entry<String, Binary> entry : 
progressIndex.getState().entrySet()) {
-        size +=
-            RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
-                + sizeOfString(entry.getKey())
-                + sizeOfBinary(entry.getValue());
-      }
-    }
-    return size;
-  }
-
-  private static long sizeOfTimeWindowStateProgressIndex(
-      TimeWindowStateProgressIndex progressIndex) {
-    // Memory alignment of basic types and reference types in structures
-    long size = TIME_WINDOW_STATE_PROGRESS_INDEX_SIZE;
-
-    // Memory calculation in reference type, cannot get exact value, roughly 
estimate
-    size += REENTRANT_READ_WRITE_LOCK_SIZE;
-    if (progressIndex.getTimeSeries2TimestampWindowBufferPairMap() != null) {
-      size += NUM_BYTES_OBJECT_HEADER;
-      for (Map.Entry<String, Pair<Long, ByteBuffer>> entry :
-          
progressIndex.getTimeSeries2TimestampWindowBufferPairMap().entrySet()) {
-        size +=
-            sizeOfString(entry.getKey())
-                + SIZE_OF_LONG * 2
-                + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
-      }
-    }
-    return size;
+    return Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0L;
   }
 
   // =============================Write==================================
 
-  private static long sizeOfBinary(Binary binary) {
-    if (binary == null) {
-      return 0;
-    }
-    // -----header----
-    // -----ref-------
-    // ---------------
-    // --arrayHeader--
-    // ----values-----
-    return RamUsageEstimator.alignObjectSize(NUM_BYTES_OBJECT_HEADER + 
NUM_BYTES_OBJECT_REF)
-        + RamUsageEstimator.alignObjectSize(NUM_BYTES_ARRAY_HEADER + 
binary.getValues().length);
-  }
-
-  private static long sizeOfString(String value) {
-    if (value == null) {
-      return 0;
-    }
-    // -----header----
-    // -----ref-------
-    // ---------------
-    // --arrayHeader--
-    // ----values-----
-    return SIZE_OF_STRING
-        + RamUsageEstimator.alignObjectSize(NUM_BYTES_ARRAY_HEADER + 
value.length());
-  }
-
-  private static long sizeOfStringArray(final String[] values) {
-    if (values == null) {
-      return 0;
-    }
-    long size =
-        RamUsageEstimator.alignObjectSize(
-            NUM_BYTES_ARRAY_HEADER + values.length * NUM_BYTES_OBJECT_REF);
-    for (String value : values) {
-      size += sizeOfString(value);
-    }
-    return size;
+  private static long sizeOfBinary(final Binary binary) {
+    return Objects.nonNull(binary) ? binary.ramBytesUsed() : 0L;
   }
 
-  private static long sizeOfTimes(final long[] times) {
-    if (times == null) {
-      return 0;
-    }
-    long size = NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * times.length;
-    return RamUsageEstimator.alignObjectSize(size);
+  public static long sizeOfStringArray(final String[] values) {
+    return Objects.nonNull(values) ? RamUsageEstimator.sizeOf(values) : 0L;
   }
 
-  private static long sizeOfBitMapArray(BitMap[] bitMaps) {
+  public static long sizeOfBitMapArray(BitMap[] bitMaps) {
     if (bitMaps == null) {
       return 0L;
     }
@@ -702,8 +486,15 @@ public class InsertNodeMemoryEstimator {
     return size;
   }
 
-  private static long sizeOfColumns(
+  public static long sizeOfColumns(
       final Object[] columns, final MeasurementSchema[] measurementSchemas) {
+    // Directly calculate if measurementSchemas are absent
+    if (Objects.isNull(measurementSchemas)) {
+      return RamUsageEstimator.shallowSizeOf(columns)
+          + Arrays.stream(columns)
+              .mapToLong(InsertNodeMemoryEstimator::getNumBytesUnknownObject)
+              .reduce(0L, Long::sum);
+    }
     long size =
         RamUsageEstimator.alignObjectSize(
             NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length);
@@ -743,13 +534,7 @@ public class InsertNodeMemoryEstimator {
         case TEXT:
         case BLOB:
           {
-            final Binary[] values = (Binary[]) columns[i];
-            size +=
-                RamUsageEstimator.alignObjectSize(
-                    NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * 
values.length);
-            for (Binary value : values) {
-              size += sizeOfBinary(value);
-            }
+            size += getBinarySize((Binary[]) columns[i]);
             break;
           }
       }
@@ -757,12 +542,32 @@ public class InsertNodeMemoryEstimator {
     return size;
   }
 
-  private static long sizeOfValues(
-      final Object[] columns, final MeasurementSchema[] measurementSchemas) {
+  private static long getNumBytesUnknownObject(final Object obj) {
+    return obj instanceof Binary[]
+        ? getBinarySize((Binary[]) obj)
+        : RamUsageEstimator.sizeOfObject(obj);
+  }
+
+  private static long getBinarySize(final Binary[] binaries) {
+    return RamUsageEstimator.shallowSizeOf(binaries)
+        + Arrays.stream(binaries)
+            .mapToLong(InsertNodeMemoryEstimator::sizeOfBinary)
+            .reduce(0L, Long::sum);
+  }
+
+  public static long sizeOfValues(
+      final Object[] values, final MeasurementSchema[] measurementSchemas) {
+    // Directly calculate if measurementSchemas are absent
+    if (Objects.isNull(measurementSchemas)) {
+      return RamUsageEstimator.shallowSizeOf(values)
+          + Arrays.stream(values)
+              .mapToLong(InsertNodeMemoryEstimator::getNumBytesUnknownObject)
+              .reduce(0L, Long::sum);
+    }
     long size =
         RamUsageEstimator.alignObjectSize(
-            NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length);
-    for (int i = 0; i < columns.length; i++) {
+            NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * values.length);
+    for (int i = 0; i < values.length; i++) {
       switch (measurementSchemas[i].getType()) {
         case INT64:
         case TIMESTAMP:
@@ -795,7 +600,7 @@ public class InsertNodeMemoryEstimator {
         case TEXT:
         case BLOB:
           {
-            final Binary binary = (Binary) columns[i];
+            final Binary binary = (Binary) values[i];
             size += sizeOfBinary(binary);
           }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 6d94f12cc35..b6c4d6a4382 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -30,13 +30,16 @@ import 
org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Accountable;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
@@ -46,10 +49,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public abstract class InsertBaseStatement extends Statement {
+public abstract class InsertBaseStatement extends Statement implements 
Accountable {
 
   /**
    * if use id table, this filed is id form of device path <br>
@@ -87,6 +91,8 @@ public abstract class InsertBaseStatement extends Statement {
   /** it is the end of current range. */
   protected int recordedEndOfLogicalViewSchemaList = 0;
 
+  protected long ramBytesUsed = Long.MIN_VALUE;
+
   // endregion
 
   public PartialPath getDevicePath() {
@@ -404,5 +410,37 @@ public abstract class InsertBaseStatement extends 
Statement {
       }
     }
   }
+
   // endregion
+  @Override
+  public long ramBytesUsed() {
+    if (ramBytesUsed > 0) {
+      return ramBytesUsed;
+    }
+    ramBytesUsed =
+        InsertNodeMemoryEstimator.sizeOfPartialPath(devicePath)
+            + 
InsertNodeMemoryEstimator.sizeOfMeasurementSchemas(measurementSchemas)
+            + InsertNodeMemoryEstimator.sizeOfStringArray(measurements)
+            + RamUsageEstimator.shallowSizeOf(dataTypes)
+            + shallowSizeOfList(logicalViewSchemaList)
+            + (Objects.nonNull(logicalViewSchemaList)
+                ? logicalViewSchemaList.stream()
+                    .mapToLong(LogicalViewSchema::ramBytesUsed)
+                    .reduce(0L, Long::sum)
+                : 0L)
+            + shallowSizeOfList(indexOfSourcePathsOfLogicalViews)
+            + calculateBytesUsed();
+    return ramBytesUsed;
+  }
+
+  private long shallowSizeOfList(List<?> list) {
+    return Objects.nonNull(list)
+        ? InsertRowsStatement.LIST_SIZE
+            + RamUsageEstimator.alignObjectSize(
+                RamUsageEstimator.NUM_BYTES_ARRAY_HEADER
+                    + (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF * 
list.size())
+        : 0L;
+  }
+
+  protected abstract long calculateBytesUsed();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index bfc4e6406c6..a2b9c63e1e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -27,13 +27,18 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class InsertMultiTabletsStatement extends InsertBaseStatement {
 
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(InsertMultiTabletsStatement.class);
+
   /** The {@link InsertTabletStatement} list */
   List<InsertTabletStatement> insertTabletStatementList;
 
@@ -144,4 +149,15 @@ public class InsertMultiTabletsStatement extends 
InsertBaseStatement {
     splitResult.setInsertTabletStatementList(mergedList);
     return splitResult;
   }
+
+  @Override
+  protected long calculateBytesUsed() {
+    return INSTANCE_SIZE
+        + (Objects.nonNull(insertTabletStatementList)
+            ? InsertRowsStatement.LIST_SIZE
+                + insertTabletStatementList.stream()
+                    .mapToLong(InsertTabletStatement::calculateBytesUsed)
+                    .reduce(0L, Long::sum)
+            : 0);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 56835e45adb..3422fd7311f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
@@ -41,6 +42,7 @@ import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -59,6 +61,9 @@ public class InsertRowStatement extends InsertBaseStatement 
implements ISchemaVa
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertRowStatement.class);
 
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(InsertRowStatement.class);
+
   protected static final byte TYPE_RAW_STRING = -1;
   protected static final byte TYPE_NULL = -2;
 
@@ -464,4 +469,11 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
     return new Pair<>(
         this.recordedBeginOfLogicalViewSchemaList, 
this.recordedEndOfLogicalViewSchemaList);
   }
+
+  @Override
+  protected long calculateBytesUsed() {
+    return INSTANCE_SIZE
+        + InsertNodeMemoryEstimator.sizeOfValues(values, measurementSchemas)
+        + RamUsageEstimator.sizeOf(measurementIsAligned);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index 55e7a57beb0..5013a13915c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -30,15 +30,20 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
 
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(InsertRowsOfOneDeviceStatement.class);
+
   public InsertRowsOfOneDeviceStatement() {
     super();
     statementType = StatementType.BATCH_INSERT_ONE_DEVICE;
@@ -167,4 +172,15 @@ public class InsertRowsOfOneDeviceStatement extends 
InsertBaseStatement {
     }
     return this;
   }
+
+  @Override
+  protected long calculateBytesUsed() {
+    return INSTANCE_SIZE
+        + (Objects.nonNull(insertRowStatementList)
+            ? InsertRowsStatement.LIST_SIZE
+                + insertRowStatementList.stream()
+                    .mapToLong(InsertRowStatement::calculateBytesUsed)
+                    .reduce(0L, Long::sum)
+            : 0);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
index 5dbdba2f228..41ad8516db7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
@@ -28,13 +28,19 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class InsertRowsStatement extends InsertBaseStatement {
 
+  public static final long LIST_SIZE = 
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(InsertRowsStatement.class);
+
   /** the InsertRowsStatement list */
   private List<InsertRowStatement> insertRowStatementList;
 
@@ -162,4 +168,15 @@ public class InsertRowsStatement extends 
InsertBaseStatement {
     splitResult.setInsertRowStatementList(mergedList);
     return splitResult;
   }
+
+  @Override
+  protected long calculateBytesUsed() {
+    return INSTANCE_SIZE
+        + (Objects.nonNull(insertRowStatementList)
+            ? LIST_SIZE
+                + insertRowStatementList.stream()
+                    .mapToLong(InsertRowStatement::calculateBytesUsed)
+                    .reduce(0L, Long::sum)
+            : 0);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 28146562b03..99ed39caa44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
 import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -38,6 +39,7 @@ import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -51,8 +53,9 @@ import java.util.List;
 import java.util.Map;
 
 public class InsertTabletStatement extends InsertBaseStatement implements 
ISchemaValidation {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertTabletStatement.class);
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(InsertTabletStatement.class);
 
   private static final String DATATYPE_UNSUPPORTED = "Data type %s is not 
supported.";
 
@@ -412,4 +415,12 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     return new Pair<>(
         this.recordedBeginOfLogicalViewSchemaList, 
this.recordedEndOfLogicalViewSchemaList);
   }
+
+  @Override
+  protected long calculateBytesUsed() {
+    return INSTANCE_SIZE
+        + RamUsageEstimator.sizeOf(times)
+        + InsertNodeMemoryEstimator.sizeOfBitMapArray(bitMaps)
+        + InsertNodeMemoryEstimator.sizeOfColumns(columns, measurementSchemas);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index dd15c21a48c..6a78fce213a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -261,6 +261,7 @@ public class CommonConfig {
   private int pipeAirGapReceiverPort = 9780;
 
   private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
+  private double pipeReceiverActualToEstimatedMemoryRatio = 3;
 
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
@@ -1082,6 +1083,15 @@ public class CommonConfig {
     return pipeReceiverLoginPeriodicVerificationIntervalMs;
   }
 
+  public void setPipeReceiverActualToEstimatedMemoryRatio(
+      double pipeReceiverActualToEstimatedMemoryRatio) {
+    this.pipeReceiverActualToEstimatedMemoryRatio = 
pipeReceiverActualToEstimatedMemoryRatio;
+  }
+
+  public double getPipeReceiverActualToEstimatedMemoryRatio() {
+    return pipeReceiverActualToEstimatedMemoryRatio;
+  }
+
   public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
     return pipeMaxAllowedHistoricalTsFilePerDataRegion;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d7d8ba068f4..4634b61e74d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -543,6 +543,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_receiver_login_periodic_verification_interval_ms",
                 
Long.toString(config.getPipeReceiverLoginPeriodicVerificationIntervalMs()))));
+    config.setPipeReceiverActualToEstimatedMemoryRatio(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_receiver_actual_to_estimated_memory_ratio",
+                
Double.toString(config.getPipeReceiverActualToEstimatedMemoryRatio()))));
 
     config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index b27c2666cc9..2c8895532dc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -42,7 +42,9 @@ import java.util.stream.Collectors;
 public class HybridProgressIndex extends ProgressIndex {
 
   private static final long INSTANCE_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class) + 
ProgressIndex.LOCK_SIZE;
+      RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class)
+          + RamUsageEstimator.shallowSizeOfInstance(HashMap.class)
+          + ProgressIndex.LOCK_SIZE;
   private static final long ENTRY_SIZE =
       RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
           + RamUsageEstimator.alignObjectSize(Short.BYTES);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 24d49da94c1..677ca016eee 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -254,6 +254,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeReceiverLoginPeriodicVerificationIntervalMs();
   }
 
+  public double getPipeReceiverActualToEstimatedMemoryRatio() {
+    return COMMON_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
+  }
+
   /////////////////////////////// Hybrid Mode ///////////////////////////////
 
   public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
@@ -494,6 +498,9 @@ public class PipeConfig {
     LOGGER.info(
         "PipeReceiverLoginPeriodicVerificationIntervalMs: {}",
         getPipeReceiverLoginPeriodicVerificationIntervalMs());
+    LOGGER.info(
+        "PipeReceiverActualToEstimatedMemoryRatio: {}",
+        getPipeReceiverActualToEstimatedMemoryRatio());
 
     LOGGER.info(
         "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
index c54bdfb8475..7121c979af2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/view/LogicalViewSchema.java
@@ -29,6 +29,8 @@ import org.apache.tsfile.encoding.encoder.Encoder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchemaType;
@@ -43,7 +45,10 @@ import java.util.List;
 import java.util.Map;
 
 public class LogicalViewSchema
-    implements IMeasurementSchema, Comparable<LogicalViewSchema>, Serializable 
{
+    implements IMeasurementSchema, Comparable<LogicalViewSchema>, 
Serializable, Accountable {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(LogicalViewSchema.class);
 
   private String measurementId;
 
@@ -253,4 +258,10 @@ public class LogicalViewSchema
     }
     return null;
   }
+
+  @Override
+  public long ramBytesUsed() {
+    // Roughly estimate the expression size
+    return INSTANCE_SIZE + RamUsageEstimator.sizeOf(measurementId) + 256;
+  }
 }


Reply via email to