This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch pipe-dynamic-memory-allocation
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b8a28f54586309e3d5fa09ac77672be0f0a4815a
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 29 12:33:11 2026 +0800

    Improve pipe runtime memory allocation
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |   5 +-
 .../tsfile/parser/TsFileInsertionEventParser.java  |   5 +-
 .../scan/TsFileInsertionEventScanParser.java       |   8 +-
 .../table/TsFileInsertionEventTableParser.java     |  21 +---
 .../evolvable/batch/PipeTabletEventBatch.java      |  70 ++++++++++---
 .../evolvable/batch/PipeTabletEventPlainBatch.java |  51 ++++++---
 .../batch/PipeTabletEventTsFileBatch.java          |  30 +++---
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  18 +++-
 .../iotconsensusv2/IoTConsensusV2SyncSink.java     |  20 +++-
 .../IoTConsensusV2TsFileInsertionEventHandler.java |  34 +++++-
 .../IoTConsensusV2TransferBatchReqBuilder.java     | 116 +++++++++++++--------
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  |  17 ++-
 .../async/handler/PipeTransferTsFileHandler.java   |  42 +++++---
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  88 ++++++++++++++++
 14 files changed, 378 insertions(+), 147 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 3509e6b29ce..21fc71da969 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -794,9 +794,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     long needMemory = 0;
 
-    needMemory += calculateTsFileParserMemory(sourceParameters, 
sinkParameters);
-    needMemory += calculateSinkBatchMemory(sinkParameters);
-    needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, 
sinkParameters);
+    // TsFile parser, sink batch, and TsFile read buffer memory are allocated 
dynamically
+    // from PipeMemoryManager only while they are active.
     needMemory += calculateAssignerMemory(sourceParameters);
 
     PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index 627731fa7ed..e53e0a94827 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
@@ -109,9 +108,7 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
     this.sourceEvent = sourceEvent;
 
     this.allocatedMemoryBlockForTablet =
-        PipeDataNodeResourceManager.memory()
-            .forceAllocateForTabletWithRetry(
-                
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
+        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
 
     LOGGER.debug(
         
DataNodePipeMessages.TSFILE_HAS_INITIALIZED_PIPENAME_CREATION_TIME_PATTERN,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index e3af5aaa0c1..9f6ccf728d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -29,7 +29,6 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -144,12 +143,9 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     filter = Objects.nonNull(timeFilterExpression) ? 
timeFilterExpression.getFilter() : null;
 
     this.allocatedMemoryBlockForBatchData =
-        PipeDataNodeResourceManager.memory()
-            .forceAllocateForTabletWithRetry(
-                
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
+        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
     this.allocatedMemoryBlockForChunk =
-        PipeDataNodeResourceManager.memory()
-            
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
+        
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
 
     try {
       currentModifications =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 8ecdcc0cec5..7c80e44cc1e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -22,11 +22,9 @@ package 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
 import org.apache.iotdb.commons.audit.IAuditEntity;
 import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -94,25 +92,14 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
       allocatedMemoryBlockForModifications =
           PipeDataNodeResourceManager.memory()
               
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
-      long tableSize =
-          Math.min(
-              
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
-              IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
-
       this.allocatedMemoryBlockForChunk =
-          PipeDataNodeResourceManager.memory()
-              .forceAllocateForTabletWithRetry(
-                  PipeConfig.getInstance().getPipeMaxReaderChunkSize());
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
       this.allocatedMemoryBlockForBatchData =
-          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
       this.allocatedMemoryBlockForChunkMeta =
-          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
       this.allocatedMemoryBlockForTableSchemas =
-          PipeDataNodeResourceManager.memory()
-              .forceAllocateForTabletWithRetry(
-                  IoTDBDescriptor.getInstance()
-                      .getConfig()
-                      .getPipeDataStructureTabletSizeInBytes());
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
 
       this.startTime = startTime;
       this.endTime = endTime;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index aede0e994d9..a751f798a0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -61,8 +61,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
 
     // limit in buffer size
     this.maxBatchSizeInBytes = requestMaxBatchSizeInBytes;
-    this.allocatedMemoryBlock =
-        
PipeDataNodeResourceManager.memory().forceAllocate(requestMaxBatchSizeInBytes);
+    this.allocatedMemoryBlock = 
PipeDataNodeResourceManager.memory().forceAllocate(0);
     if (recordMetric != null) {
       this.recordMetric = recordMetric;
     } else {
@@ -97,6 +96,10 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
             events.add((EnrichedEvent) event);
           }
         } catch (final Exception e) {
+          if (events.isEmpty()) {
+            clearBatchData();
+            resetMemoryUsage();
+          }
           // If the event is not added to the batch, we need to decrease the 
reference count.
           ((EnrichedEvent) event)
               
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
@@ -126,7 +129,28 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   protected abstract boolean constructBatch(final TabletInsertionEvent event)
       throws WALPipeException, IOException;
 
+  protected void increaseTotalBufferSizeAndUpdateMemoryBlock(final long 
bufferSize) {
+    if (bufferSize <= 0) {
+      return;
+    }
+
+    final long newTotalBufferSize = totalBufferSize + bufferSize;
+    PipeDataNodeResourceManager.memory()
+        .forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, 
maxBatchSizeInBytes));
+    totalBufferSize = newTotalBufferSize;
+  }
+
+  protected void releaseAllocatedMemoryBlock() {
+    PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0);
+  }
+
+  protected void clearBatchData() {}
+
   public boolean shouldEmit() {
+    if (events.isEmpty()) {
+      return false;
+    }
+
     final long diff = System.currentTimeMillis() - firstEventProcessingTime;
     if (totalBufferSize >= maxBatchSizeInBytes || diff >= maxDelayInMs) {
       recordMetric.accept(diff, totalBufferSize, events.size());
@@ -138,23 +162,26 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   public synchronized void onSuccess() {
     events.clear();
 
-    totalBufferSize = 0;
-
-    firstEventProcessingTime = Long.MIN_VALUE;
+    resetMemoryUsage();
   }
 
   @Override
   public synchronized void close() {
+    if (isClosed) {
+      return;
+    }
     isClosed = true;
 
     clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
     events.clear();
+    clearBatchData();
+    resetMemoryUsage();
     allocatedMemoryBlock.close();
   }
 
   /**
-   * Discard all events of the given pipe. This method only clears the 
reference count of the events
-   * and discard them, but do not modify other objects (such as buffers) for 
simplicity.
+   * Discard all events of the given pipe. This method only clears the 
reference count of the
+   * events. If some events remain, cached batch data is kept unchanged for 
simplicity.
    */
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
@@ -162,14 +189,27 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   }
 
   public synchronized void discardEventsOfPipe(final CommitterKey 
committerKey) {
-    events.removeIf(
-        event -> {
-          if (isEventFromPipe(event, committerKey)) {
-            
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
-            return true;
-          }
-          return false;
-        });
+    final boolean hasDiscardedEvents =
+        events.removeIf(
+            event -> {
+              if (isEventFromPipe(event, committerKey)) {
+                
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
+                return true;
+              }
+              return false;
+            });
+    if (hasDiscardedEvents && events.isEmpty()) {
+      clearBatchData();
+      resetMemoryUsage();
+    }
+  }
+
+  private void resetMemoryUsage() {
+    totalBufferSize = 0;
+
+    releaseAllocatedMemoryBlock();
+
+    firstEventProcessingTime = Long.MIN_VALUE;
   }
 
   private static boolean isEventFromPipe(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index b32479e2f1a..05b348f3237 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -74,7 +74,6 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   @Override
   protected boolean constructBatch(final TabletInsertionEvent event) throws 
IOException {
     final long bufferSize = buildTabletInsertionBuffer(event);
-    totalBufferSize += bufferSize;
     pipe2BytesAccumulated.compute(
         new Pair<>(
             ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) 
event).getCreationTime()),
@@ -85,8 +84,13 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
 
   @Override
   public synchronized void onSuccess() {
+    clearBatchData();
+
     super.onSuccess();
+  }
 
+  @Override
+  protected void clearBatchData() {
     insertNodeBuffers.clear();
     tabletBuffers.clear();
 
@@ -161,24 +165,21 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
       final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
       if (!(insertNode instanceof RelationalInsertTabletNode)) {
         buffer = insertNode.serializeToByteBuffer();
+        final String databaseName =
+            pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
+                ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
+                : 
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
+        estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit();
+        increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize);
         insertNodeBuffers.add(buffer);
-        if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
-          final String databaseName =
-              pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName();
-          estimateSize = RamUsageEstimator.sizeOf(databaseName);
-          insertNodeDataBases.add(databaseName);
-        } else {
-          final String databaseName = 
pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName();
-          estimateSize = RamUsageEstimator.sizeOf(databaseName);
-          insertNodeDataBases.add(databaseName);
-        }
-        estimateSize += buffer.limit();
+        insertNodeDataBases.add(databaseName);
       } else {
-        for (final Tablet tablet :
-            ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) {
-          estimateSize +=
-              constructTabletBatch(
-                  tablet, 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+        final List<Tablet> tablets = 
pipeInsertNodeTabletInsertionEvent.convertToTablets();
+        estimateSize = calculateTabletsSizeInBytes(tablets);
+        increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize);
+        for (final Tablet tablet : tablets) {
+          constructTabletBatchWithoutMemoryReservation(
+              tablet, 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
         }
       }
     } else {
@@ -198,6 +199,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
         }
         final String databaseName = 
pipeRawTabletInsertionEvent.getTreeModelDatabaseName();
         estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit();
+        increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize);
         tabletBuffers.add(buffer);
         tabletDataBases.add(databaseName);
       }
@@ -207,12 +209,27 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   }
 
   private long constructTabletBatch(final Tablet tablet, final String 
databaseName) {
+    final long estimateSize = calculateTabletSizeInBytes(tablet);
+    increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize);
+    constructTabletBatchWithoutMemoryReservation(tablet, databaseName);
+    return estimateSize;
+  }
+
+  private void constructTabletBatchWithoutMemoryReservation(
+      final Tablet tablet, final String databaseName) {
     final Pair<Integer, List<Tablet>> currentBatch =
         tableModelTabletMap
             .computeIfAbsent(databaseName, k -> new HashMap<>())
             .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new 
ArrayList<>()));
     currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
     currentBatch.getRight().add(tablet);
+  }
+
+  private long calculateTabletsSizeInBytes(final List<Tablet> tablets) {
+    return 
tablets.stream().mapToLong(PipeTabletEventPlainBatch::calculateTabletSizeInBytes).sum();
+  }
+
+  private static long calculateTabletSizeInBytes(final Tablet tablet) {
     return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 7b511e23fc6..053b42b2c78 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -86,6 +86,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
           (PipeInsertNodeTabletInsertionEvent) event;
       final boolean isTableModel = 
insertNodeTabletInsertionEvent.isTableModelEvent();
       final List<Tablet> tablets = 
insertNodeTabletInsertionEvent.convertToTablets();
+      
increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletsSizeInBytes(tablets));
       for (int i = 0; i < tablets.size(); ++i) {
         final Tablet tablet = tablets.get(i);
         if (isTabletEmpty(tablet)) {
@@ -114,6 +115,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
       if (isTabletEmpty(tablet)) {
         return true;
       }
+      
increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletSizeInBytes(tablet));
       if (rawTabletInsertionEvent.isTableModelEvent()) {
         // table Model
         bufferTableModelTablet(
@@ -139,6 +141,17 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     return true;
   }
 
+  private long calculateTabletsSizeInBytes(final List<Tablet> tablets) {
+    return tablets.stream()
+        .filter(tablet -> !isTabletEmpty(tablet))
+        .mapToLong(PipeTabletEventTsFileBatch::calculateTabletSizeInBytes)
+        .sum();
+  }
+
+  private static long calculateTabletSizeInBytes(final Tablet tablet) {
+    return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2;
+  }
+
   private void bufferTreeModelTablet(
       final String pipeName,
       final long creationTime,
@@ -146,11 +159,6 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
       final boolean isAligned) {
     new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
-    // TODO: Currently, PipeTreeModelTsFileBuilderV2 still uses 
PipeTreeModelTsFileBuilder as a
-    // fallback builder, so memory table writing and storing temporary tablets 
require double the
-    // memory.
-    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) 
* 2;
-
     pipeName2WeightMap.compute(
         new Pair<>(pipeName, creationTime),
         (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1);
@@ -162,11 +170,6 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
       final String pipeName, final long creationTime, final Tablet tablet, 
final String dataBase) {
     new 
PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp();
 
-    // TODO: Currently, PipeTableModelTsFileBuilderV2 still uses 
PipeTableModelTsFileBuilder as a
-    // fallback builder, so memory table writing and storing temporary tablets 
require double the
-    // memory.
-    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) 
* 2;
-
     pipeName2WeightMap.compute(
         new Pair<>(pipeName, creationTime),
         (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1);
@@ -209,8 +212,13 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
 
   @Override
   public synchronized void onSuccess() {
+    clearBatchData();
+
     super.onSuccess();
+  }
 
+  @Override
+  protected void clearBatchData() {
     pipeName2WeightMap.clear();
     tableModeTsFileBuilder.onSuccess();
     treeModeTsFileBuilder.onSuccess();
@@ -220,8 +228,6 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   public synchronized void close() {
     super.close();
 
-    pipeName2WeightMap.clear();
-
     tableModeTsFileBuilder.close();
     treeModeTsFileBuilder.close();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 4f2dab1bfa8..92a8c731fbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -33,6 +33,8 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
@@ -497,10 +499,13 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       final AirGapSocket socket,
       final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = PIPE_CONFIG.getPipeSinkReadFileBufferSize();
-    final byte[] readBuffer = new byte[readFileBufferSize];
-    long position = 0;
-    try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+    final int readFileBufferSize = getReadFileBufferSize(file);
+    try (final PipeTsFileMemoryBlock ignored =
+            PipeDataNodeResourceManager.memory()
+                .forceAllocateForTsFileWithRetry(readFileBufferSize);
+        final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+      final byte[] readBuffer = new byte[readFileBufferSize];
+      long position = 0;
       while (true) {
         mayLimitRateAndRecordIO(readFileBufferSize);
         final int readLength = reader.read(readBuffer);
@@ -532,6 +537,11 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     }
   }
 
+  private int getReadFileBufferSize(final File file) {
+    return (int)
+        Math.min((long) PIPE_CONFIG.getPipeSinkReadFileBufferSize(), 
Math.max(file.length(), 1L));
+  }
+
   private boolean sendBatch(
       final AirGapSocket socket,
       byte[] bytes,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
index 481e340a739..9c3104c0ad1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
@@ -39,6 +39,8 @@ import 
org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq;
@@ -435,10 +437,13 @@ public class IoTConsensusV2SyncSink extends IoTDBSink {
       final TCommitId tCommitId,
       final TConsensusGroupId tConsensusGroupId)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
-    final byte[] readBuffer = new byte[readFileBufferSize];
-    long position = 0;
-    try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+    final int readFileBufferSize = getReadFileBufferSize(file);
+    try (final PipeTsFileMemoryBlock ignored =
+            PipeDataNodeResourceManager.memory()
+                .forceAllocateForTsFileWithRetry(readFileBufferSize);
+        final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+      final byte[] readBuffer = new byte[readFileBufferSize];
+      long position = 0;
       while (true) {
         final int readLength = reader.read(readBuffer);
         if (readLength == -1) {
@@ -501,6 +506,13 @@ public class IoTConsensusV2SyncSink extends IoTDBSink {
     }
   }
 
+  private int getReadFileBufferSize(final File file) {
+    return (int)
+        Math.min(
+            (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
+            Math.max(file.length(), 1L));
+  }
+
   private TEndPoint getFollowerUrl() {
     // In current iotConsensusV2 design, one connector corresponds to one 
follower, so the peers is
     // actually a singleton list
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
index 4e269aaa7e8..52815e645bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
@@ -31,6 +31,8 @@ import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferR
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq;
@@ -70,7 +72,8 @@ public class IoTConsensusV2TsFileInsertionEventHandler
   private final boolean transferMod;
 
   private final int readFileBufferSize;
-  private final byte[] readBuffer;
+  private PipeTsFileMemoryBlock memoryBlock;
+  private byte[] readBuffer;
   private long position;
 
   private RandomAccessFile reader;
@@ -106,8 +109,15 @@ public class IoTConsensusV2TsFileInsertionEventHandler
     transferMod = event.isWithMod();
     currentFile = transferMod ? modFile : tsFile;
 
-    readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
-    readBuffer = new byte[readFileBufferSize];
+    final long maxFileLength =
+        transferMod && Objects.nonNull(modFile)
+            ? Math.max(tsFile.length(), modFile.length())
+            : tsFile.length();
+    readFileBufferSize =
+        (int)
+            Math.min(
+                (long) 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
+                Math.max(maxFileLength, 1L));
     position = 0;
 
     reader =
@@ -128,6 +138,12 @@ public class IoTConsensusV2TsFileInsertionEventHandler
     this.client = client;
     client.setShouldReturnSelf(false);
 
+    if (readBuffer == null) {
+      memoryBlock =
+          
PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize);
+      readBuffer = new byte[readFileBufferSize];
+    }
+
     final int readLength = reader.read(readBuffer);
     if (readLength == -1) {
       if (currentFile == modFile) {
@@ -246,6 +262,8 @@ public class IoTConsensusV2TsFileInsertionEventHandler
           client.returnSelf();
         }
 
+        releaseReadBufferMemoryBlock();
+
         long duration = System.nanoTime() - createTime;
         metric.recordConnectorTsFileTransferTimer(duration);
       }
@@ -330,10 +348,20 @@ public class IoTConsensusV2TsFileInsertionEventHandler
       connector.addFailureEventToRetryQueue(event);
       metric.recordRetryCounter();
 
+      releaseReadBufferMemoryBlock();
+
       if (client != null) {
         client.setShouldReturnSelf(true);
         client.returnSelf();
       }
     }
   }
+
+  private void releaseReadBufferMemoryBlock() {
+    if (memoryBlock != null) {
+      memoryBlock.close();
+      memoryBlock = null;
+      readBuffer = null;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
index 677c77e0540..b55cb1233f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java
@@ -70,6 +70,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder 
implements AutoClose
   protected long firstEventProcessingTime = Long.MIN_VALUE;
 
   // limit in buffer size
+  protected final long maxBatchSizeInBytes;
   protected final PipeMemoryBlock allocatedMemoryBlock;
   protected long totalBufferSize = 0;
 
@@ -92,37 +93,12 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder 
implements AutoClose
     this.consensusGroupId = consensusGroupId;
     this.thisDataNodeId = thisDataNodeId;
 
-    final long requestMaxBatchSizeInBytes =
+    maxBatchSizeInBytes =
         parameters.getLongOrDefault(
             Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
             CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
 
-    allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .tryAllocate(requestMaxBatchSizeInBytes)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
-            .setShrinkCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        
DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_SHRUNK_FROM,
-                        oldMemory,
-                        newMemory))
-            .setExpandMethod(
-                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
requestMaxBatchSizeInBytes))
-            .setExpandCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        
DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_EXPANDED_FROM,
-                        oldMemory,
-                        newMemory));
-
-    if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
-      LOGGER.info(
-          "IoTConsensusV2TransferBatchReqBuilder: the max batch size is 
adjusted from {} to {} due to the "
-              + "memory restriction",
-          requestMaxBatchSizeInBytes,
-          getMaxBatchSizeInBytes());
-    }
+    allocatedMemoryBlock = 
PipeDataNodeResourceManager.memory().forceAllocate(0);
   }
 
   /**
@@ -137,27 +113,80 @@ public abstract class 
IoTConsensusV2TransferBatchReqBuilder implements AutoClose
       return false;
     }
 
-    final long requestCommitId = ((EnrichedEvent) 
event).getReplicateIndexForIoTV2();
+    final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+    final long requestCommitId = enrichedEvent.getReplicateIndexForIoTV2();
 
     // The deduplication logic here is to avoid the accumulation of the same 
event in a batch when
     // retrying.
     if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
-      events.add((EnrichedEvent) event);
-      requestCommitIds.add(requestCommitId);
-      final int bufferSize = buildTabletInsertionBuffer(event);
-
-      ((EnrichedEvent) event)
-          
.increaseReferenceCount(IoTConsensusV2TransferBatchReqBuilder.class.getName());
+      if (!enrichedEvent.increaseReferenceCount(
+          IoTConsensusV2TransferBatchReqBuilder.class.getName())) {
+        
LOGGER.warn(DataNodePipeMessages.CANNOT_INCREASE_REFERENCE_COUNT_FOR_EVENT_IGNORE,
 event);
+        return shouldEmit();
+      }
 
-      if (firstEventProcessingTime == Long.MIN_VALUE) {
-        firstEventProcessingTime = System.currentTimeMillis();
+      final int previousEventsSize = events.size();
+      final int previousRequestCommitIdsSize = requestCommitIds.size();
+      final int previousBatchReqsSize = batchReqs.size();
+      try {
+        events.add(enrichedEvent);
+        requestCommitIds.add(requestCommitId);
+        final int bufferSize = buildTabletInsertionBuffer(event);
+        increaseTotalBufferSizeAndUpdateMemoryBlock(bufferSize);
+
+        if (firstEventProcessingTime == Long.MIN_VALUE) {
+          firstEventProcessingTime = System.currentTimeMillis();
+        }
+      } catch (final Exception e) {
+        rollbackTo(previousEventsSize, previousRequestCommitIdsSize, 
previousBatchReqsSize);
+        if (events.isEmpty()) {
+          resetMemoryUsage();
+        }
+        enrichedEvent.decreaseReferenceCount(
+            IoTConsensusV2TransferBatchReqBuilder.class.getName(), false);
+        throw e;
       }
+    }
+
+    return shouldEmit();
+  }
 
-      totalBufferSize += bufferSize;
+  private boolean shouldEmit() {
+    return !events.isEmpty()
+        && (totalBufferSize >= getMaxBatchSizeInBytes()
+            || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs);
+  }
+
+  private void increaseTotalBufferSizeAndUpdateMemoryBlock(final long 
bufferSize) {
+    if (bufferSize <= 0) {
+      return;
     }
 
-    return totalBufferSize >= getMaxBatchSizeInBytes()
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+    final long newTotalBufferSize = totalBufferSize + bufferSize;
+    PipeDataNodeResourceManager.memory()
+        .forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, 
getMaxBatchSizeInBytes()));
+    totalBufferSize = newTotalBufferSize;
+  }
+
+  private void rollbackTo(
+      final int previousEventsSize,
+      final int previousRequestCommitIdsSize,
+      final int previousBatchReqsSize) {
+    while (events.size() > previousEventsSize) {
+      events.remove(events.size() - 1);
+    }
+    while (requestCommitIds.size() > previousRequestCommitIdsSize) {
+      requestCommitIds.remove(requestCommitIds.size() - 1);
+    }
+    while (batchReqs.size() > previousBatchReqsSize) {
+      batchReqs.remove(batchReqs.size() - 1);
+    }
+  }
+
+  private void resetMemoryUsage() {
+    firstEventProcessingTime = Long.MIN_VALUE;
+    totalBufferSize = 0;
+    PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0);
   }
 
   public synchronized void onSuccess() {
@@ -166,9 +195,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder 
implements AutoClose
     events.clear();
     requestCommitIds.clear();
 
-    firstEventProcessingTime = Long.MIN_VALUE;
-
-    totalBufferSize = 0;
+    resetMemoryUsage();
   }
 
   public IoTConsensusV2TabletBatchReq toTIoTConsensusV2BatchTransferReq() 
throws IOException {
@@ -176,7 +203,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder 
implements AutoClose
   }
 
   protected long getMaxBatchSizeInBytes() {
-    return allocatedMemoryBlock.getMemoryUsageInBytes();
+    return maxBatchSizeInBytes;
   }
 
   public boolean isEmpty() {
@@ -220,6 +247,9 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder 
implements AutoClose
         ((EnrichedEvent) event).clearReferenceCount(this.getClass().getName());
       }
     }
+    batchReqs.clear();
+    events.clear();
+    requestCommitIds.clear();
     allocatedMemoryBlock.close();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index c0a9eb6a79d..735e6c48dbb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -37,6 +37,8 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
 import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -417,8 +419,12 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
     long position = 0;
 
     // Try small piece to rebase the file position.
-    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()];
-    try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"r")) {
+    final int readFileBufferSize = getReadFileBufferSize(file);
+    try (final PipeTsFileMemoryBlock ignored =
+            PipeDataNodeResourceManager.memory()
+                .forceAllocateForTsFileWithRetry(readFileBufferSize);
+        final RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"r")) {
+      final byte[] buffer = new byte[readFileBufferSize];
       while (true) {
         final int dataLength = randomAccessFile.read(buffer);
         if (dataLength == -1) {
@@ -456,6 +462,13 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
     }
   }
 
+  private int getReadFileBufferSize(final File file) {
+    return (int)
+        Math.min(
+            (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
+            Math.max(file.length(), 1L));
+  }
+
   @Override
   public void close() throws Exception {
     if (client != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 2467c3ce143..27cb81d6cda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -124,11 +124,15 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     // the memory of the TsFile event is not released, so the memory is not 
enough for slicing. This
     // will cause a deadlock.
     waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000)); 
// 20 - 40 seconds
+    final long maxFileLength =
+        transferMod && Objects.nonNull(modFile)
+            ? Math.max(tsFile.length(), modFile.length())
+            : tsFile.length();
     readFileBufferSize =
         (int)
             Math.min(
-                PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
-                transferMod ? Math.max(tsFile.length(), modFile.length()) : 
tsFile.length());
+                (long) 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
+                Math.max(maxFileLength, 1L));
     position = 0;
 
     isSealSignalSent = new AtomicBoolean(false);
@@ -142,21 +146,6 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       final IoTDBDataNodeAsyncClientManager clientManager,
       final AsyncPipeDataTransferServiceClient client)
       throws TException, IOException {
-    // Delay creation of resources to avoid OOM or too many open files
-    if (readBuffer == null) {
-      memoryBlock =
-          PipeDataNodeResourceManager.memory()
-              .forceAllocateForTsFileWithRetry(
-                  
PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()
-                      ? readFileBufferSize
-                      : 0);
-      readBuffer = new byte[readFileBufferSize];
-    }
-
-    if (reader == null) {
-      reader = transferMod ? new RandomAccessFile(modFile, "r") : new 
RandomAccessFile(tsFile, "r");
-    }
-
     this.clientManager = clientManager;
     this.client = client;
 
@@ -173,6 +162,17 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       return;
     }
 
+    // Delay creation of resources to avoid OOM or too many open files
+    if (readBuffer == null) {
+      memoryBlock =
+          
PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize);
+      readBuffer = new byte[readFileBufferSize];
+    }
+
+    if (reader == null) {
+      reader = transferMod ? new RandomAccessFile(modFile, "r") : new 
RandomAccessFile(tsFile, "r");
+    }
+
     client.setShouldReturnSelf(false);
     client.setTimeoutDynamically(clientManager.getConnectionTimeout());
 
@@ -256,6 +256,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       super.onComplete(response);
     } finally {
       if (sink.isClosed()) {
+        releaseReadBufferMemoryBlock();
         returnClientIfNecessary();
       }
     }
@@ -319,6 +320,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
               referenceCount);
         }
 
+        releaseReadBufferMemoryBlock();
         returnClientIfNecessary();
       }
 
@@ -361,6 +363,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     try {
       super.onError(exception);
     } finally {
+      releaseReadBufferMemoryBlock();
       returnClientIfNecessary();
     }
   }
@@ -412,6 +415,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       LOGGER.warn(DataNodePipeMessages.FAILED_TO_CLOSE_FILE_READER_OR_DELETE, 
e);
     } finally {
       try {
+        releaseReadBufferMemoryBlock();
         returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
@@ -473,10 +477,14 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   @Override
   public void close() {
     super.close();
+    releaseReadBufferMemoryBlock();
+  }
 
+  private void releaseReadBufferMemoryBlock() {
     if (memoryBlock != null) {
       memoryBlock.close();
       memoryBlock = null;
+      readBuffer = null;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index eb7d39864c4..7775092c270 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -22,10 +22,12 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
 import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -36,6 +38,8 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
 import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
@@ -71,6 +75,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.file.NoSuchFileException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -593,6 +598,89 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
     LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_TRANSFERRED_FILE, tsFile);
   }
 
+  @Override
+  protected void transferFilePieces(
+      final Map<Pair<String, Long>, Double> pipe2WeightMap,
+      final File file,
+      final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
+      final boolean isMultiFile)
+      throws PipeException, IOException {
+    final int readFileBufferSize = getReadFileBufferSize(file);
+    try (final PipeTsFileMemoryBlock ignored =
+            PipeDataNodeResourceManager.memory()
+                .forceAllocateForTsFileWithRetry(readFileBufferSize);
+        final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+      final byte[] readBuffer = new byte[readFileBufferSize];
+      long position = 0;
+      while (true) {
+        mayLimitRateAndRecordIO(readFileBufferSize);
+        final int readLength = reader.read(readBuffer);
+        if (readLength == -1) {
+          break;
+        }
+
+        final byte[] payLoad =
+            readLength == readFileBufferSize
+                ? readBuffer
+                : Arrays.copyOfRange(readBuffer, 0, readLength);
+        final PipeTransferFilePieceResp resp;
+        try {
+          final TPipeTransferReq req =
+              compressIfNeeded(
+                  isMultiFile
+                      ? getTransferMultiFilePieceReq(file.getName(), position, 
payLoad)
+                      : getTransferSingleFilePieceReq(file.getName(), 
position, payLoad));
+          pipe2WeightMap.forEach(
+              (namePair, weight) ->
+                  rateLimitIfNeeded(
+                      namePair.getLeft(),
+                      namePair.getRight(),
+                      clientAndStatus.getLeft().getEndPoint(),
+                      (long) (req.getBody().length * weight)));
+          resp =
+              PipeTransferFilePieceResp.fromTPipeTransferResp(
+                  clientAndStatus.getLeft().pipeTransfer(req));
+        } catch (final Exception e) {
+          clientAndStatus.setRight(false);
+          throw new PipeConnectionException(
+              String.format(
+                  "Network error when transfer file %s, because %s.", file, 
e.getMessage()),
+              e);
+        }
+
+        position += readLength;
+
+        final TSStatus status = resp.getStatus();
+        if (status.getCode() == 
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
+          position = resp.getEndWritingOffset();
+          reader.seek(position);
+          LOGGER.info(DataNodePipeMessages.REDIRECT_FILE_POSITION_TO, 
position);
+          continue;
+        }
+
+        if (status.getCode()
+            == 
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
+          getClientManager().sendHandshakeReq(clientAndStatus);
+        }
+
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+          receiverStatusHandler.handle(
+              resp.getStatus(),
+              String.format("Transfer file %s error, result status %s.", file, 
resp.getStatus()),
+              file.getName());
+        }
+      }
+    }
+  }
+
+  private int getReadFileBufferSize(final File file) {
+    return (int)
+        Math.min(
+            (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
+            Math.max(file.length(), 1L));
+  }
+
   @Override
   public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
     if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {

Reply via email to