This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch npe-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/npe-13 by this push:
     new 9353e008ae3 Pipe: Optimized the memory occupation of pipe realtime 
source (#17450)
9353e008ae3 is described below

commit 9353e008ae3577fb9a69ee96635b7ed93e67118a
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 9 17:29:07 2026 +0800

    Pipe: Optimized the memory occupation of pipe realtime source (#17450)
    
    * fix
    
    * fix
    
    * fix
---
 .../pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java  | 1 -
 .../protocol/thrift/async/handler/PipeTransferTsFileHandler.java   | 5 +----
 .../source/dataregion/realtime/PipeRealtimeDataRegionSource.java   | 7 +++++++
 .../pipe/agent/task/connection/UnboundedBlockingPendingQueue.java  | 4 ++++
 .../org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java  | 4 ++--
 5 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 6dc9a695b90..07381d15e28 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -647,7 +647,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         addFailureEventToRetryQueue(tsFileInsertionEvent, null);
       }
     } catch (final Exception e) {
-      
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
 false);
       addFailureEventToRetryQueue(tsFileInsertionEvent, e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 1152710ec52..a9426ed7b8b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -150,10 +150,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
 
     if (reader == null) {
-      reader =
-          Objects.nonNull(modFile)
-              ? new RandomAccessFile(modFile, "r")
-              : new RandomAccessFile(tsFile, "r");
+      reader = transferMod ? new RandomAccessFile(modFile, "r") : new 
RandomAccessFile(tsFile, "r");
     }
 
     this.clientManager = clientManager;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index b6dcf3f615b..f3ac85483bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -371,6 +371,13 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   }
 
   protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+    // Remove any heartbeat events in front of this event to avoid OOM
+    // Since the batch and retry queue no longer need the heartbeat event to 
trigger
+    // And the progress report event can trigger the processor calculation 
because it's not reported
+    // yet
+    while (((PipeRealtimeEvent) pendingQueue.peekLast()).getEvent() instanceof 
PipeHeartbeatEvent) {
+      pendingQueue.pollLast();
+    }
     if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
       final ProgressReportEvent oldEvent = (ProgressReportEvent) 
pendingQueue.peekLast();
       oldEvent.bindProgressIndex(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
index 785e89cfb9a..43fa64c158e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
@@ -37,4 +37,8 @@ public class UnboundedBlockingPendingQueue<E extends Event> 
extends BlockingPend
   public E peekLast() {
     return pendingDeque.peekLast();
   }
+
+  public E pollLast() {
+    return pendingDeque.pollLast();
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 82b85e3f828..4d53d9ea9db 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -740,7 +740,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               String.format(
                   "Failed to seal file %s, because the length of file is not 
correct. "
                       + "The original file has length %s, but receiver file 
has length %s.",
-                  fileName, fileLength, writingFileWriter.length()));
+                  fileName, fileLength, file.length()));
       PipeLogger.log(
           LOGGER::warn,
           "Receiver id = %s: Failed to seal file %s, because the length of 
file is not correct. "
@@ -748,7 +748,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           receiverId.get(),
           fileName,
           fileLength,
-          writingFileWriter.length());
+          file.length());
       return new TPipeTransferResp(status);
     }
 

Reply via email to