This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new d34bd8d5aec [To dev/1.3] Pipe: Optimized the memory occupation of pipe
realtime source (#17450)(#17474) (#17486)
d34bd8d5aec is described below
commit d34bd8d5aec7fedbd0e8d08def044d6908585206
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 15 17:42:22 2026 +0800
[To dev/1.3] Pipe: Optimized the memory occupation of pipe realtime source
(#17450)(#17474) (#17486)
* Pipe: Optimized the memory occupation of pipe realtime source (#17450)
* fix
* fix
* fix
* Pipe: Fixed the NPE of progress report event (#17474)
---
.../sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java | 1 -
.../thrift/async/handler/PipeTransferTsFileHandler.java | 5 +----
.../dataregion/realtime/PipeRealtimeDataRegionSource.java | 11 +++++++++++
.../agent/task/connection/UnboundedBlockingPendingQueue.java | 4 ++++
.../apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 4 ++--
5 files changed, 18 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 6dc9a695b90..07381d15e28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -647,7 +647,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
addFailureEventToRetryQueue(tsFileInsertionEvent, null);
}
} catch (final Exception e) {
-
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
false);
addFailureEventToRetryQueue(tsFileInsertionEvent, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 1152710ec52..a9426ed7b8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -150,10 +150,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
if (reader == null) {
- reader =
- Objects.nonNull(modFile)
- ? new RandomAccessFile(modFile, "r")
- : new RandomAccessFile(tsFile, "r");
+ reader = transferMod ? new RandomAccessFile(modFile, "r") : new
RandomAccessFile(tsFile, "r");
}
this.clientManager = clientManager;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index b6dcf3f615b..3c03ee732d0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -371,6 +371,17 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
}
protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+ // Remove any heartbeat events in front of this event to avoid OOM
+ // Since the batch and retry queue no longer need the heartbeat event to
trigger
+ // And the progress report event can trigger the processor calculation
because it's not reported
+ // yet
+ while (true) {
+ final PipeRealtimeEvent lastEvent = ((PipeRealtimeEvent)
pendingQueue.peekLast());
+ if (lastEvent == null || !(lastEvent.getEvent() instanceof
PipeHeartbeatEvent)) {
+ break;
+ }
+ pendingQueue.pollLast();
+ }
if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
final ProgressReportEvent oldEvent = (ProgressReportEvent)
pendingQueue.peekLast();
oldEvent.bindProgressIndex(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
index 785e89cfb9a..43fa64c158e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java
@@ -37,4 +37,8 @@ public class UnboundedBlockingPendingQueue<E extends Event>
extends BlockingPend
public E peekLast() {
return pendingDeque.peekLast();
}
+
+ public E pollLast() {
+ return pendingDeque.pollLast();
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 82b85e3f828..4d53d9ea9db 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -740,7 +740,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format(
"Failed to seal file %s, because the length of file is not
correct. "
+ "The original file has length %s, but receiver file
has length %s.",
- fileName, fileLength, writingFileWriter.length()));
+ fileName, fileLength, file.length()));
PipeLogger.log(
LOGGER::warn,
"Receiver id = %s: Failed to seal file %s, because the length of
file is not correct. "
@@ -748,7 +748,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverId.get(),
fileName,
fileLength,
- writingFileWriter.length());
+ file.length());
return new TPipeTransferResp(status);
}