This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 35039c5fcc3 [To dev/1.3] Pipe: Optimized the memory usage of progress
report event (#16197) (#16200)
35039c5fcc3 is described below
commit 35039c5fcc37e5a35de9001d5fd7fc2ec3622169
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 19 09:41:24 2025 +0800
[To dev/1.3] Pipe: Optimized the memory usage of progress report event
(#16197) (#16200)
* Pipe: Optimized the memory usage of progress report event (#16197)
* may_complete
* simplify
* Update PipeHistoricalDataRegionTsFileSource.java
---
.../historical/PipeHistoricalDataRegionTsFileSource.java | 8 +-------
.../dataregion/realtime/PipeRealtimeDataRegionSource.java | 14 +++++++++++++-
.../realtime/assigner/PipeDataRegionAssigner.java | 10 ++--------
.../iotdb/commons/pipe/event/ProgressReportEvent.java | 12 +++---------
.../commons/pipe/source/IoTDBNonDataRegionSource.java | 3 +--
5 files changed, 20 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index d4d44d8f591..e3278977647 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -561,13 +561,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
if (!filteredTsFileResources.contains(resource)) {
final ProgressReportEvent progressReportEvent =
- new ProgressReportEvent(
- pipeName,
- creationTime,
- pipeTaskMeta,
- pipePattern,
- historicalDataExtractionStartTime,
- historicalDataExtractionEndTime);
+ new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
final boolean isReferenceCountIncreased =
progressReportEvent.increaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index b41000a3554..4f30452fb8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -295,7 +295,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
public final void extract(final PipeRealtimeEvent event) {
// The progress report event shall be directly extracted
if (event.getEvent() instanceof ProgressReportEvent) {
- extractDirectly(event);
+ extractProgressReportEvent(event);
return;
}
@@ -381,6 +381,18 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
}
}
+ protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+ if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
+ final ProgressReportEvent oldEvent = (ProgressReportEvent)
pendingQueue.peekLast();
+ oldEvent.bindProgressIndex(
+ oldEvent
+ .getProgressIndex()
+
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ return;
+ }
+ extractDirectly(event);
+ }
+
protected void extractDirectly(final PipeRealtimeEvent event) {
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 2e478b6ff35..6a4f877bf47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -127,10 +127,7 @@ public class PipeDataRegionAssigner implements Closeable {
new ProgressReportEvent(
extractor.getPipeName(),
extractor.getCreationTime(),
- extractor.getPipeTaskMeta(),
- extractor.getPipePattern(),
- extractor.getRealtimeDataExtractionStartTime(),
- extractor.getRealtimeDataExtractionEndTime());
+ extractor.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
@@ -182,10 +179,7 @@ public class PipeDataRegionAssigner implements Closeable {
new ProgressReportEvent(
extractor.getPipeName(),
extractor.getCreationTime(),
- extractor.getPipeTaskMeta(),
- extractor.getPipePattern(),
- extractor.getRealtimeDataExtractionStartTime(),
- extractor.getRealtimeDataExtractionEndTime());
+ extractor.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index d87a5196b72..2c89f88709a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -33,13 +33,8 @@ public class ProgressReportEvent extends EnrichedEvent {
private ProgressIndex progressIndex;
public ProgressReportEvent(
- final String pipeName,
- final long creationTime,
- final PipeTaskMeta pipeTaskMeta,
- final PipePattern pipePattern,
- final long startTime,
- final long endTime) {
- super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime,
endTime);
+ final String pipeName, final long creationTime, final PipeTaskMeta
pipeTaskMeta) {
+ super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE,
Long.MAX_VALUE);
}
@Override
@@ -70,8 +65,7 @@ public class ProgressReportEvent extends EnrichedEvent {
final PipePattern pattern,
final long startTime,
final long endTime) {
- return new ProgressReportEvent(
- pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+ return new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 6c5b1a246d3..1639c9e759f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -189,8 +189,7 @@ public abstract class IoTDBNonDataRegionSource extends
IoTDBSource {
|| !isTypeListened(realtimeEvent)
|| (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
final ProgressReportEvent event =
- new ProgressReportEvent(
- pipeName, creationTime, pipeTaskMeta, pipePattern,
Long.MIN_VALUE, Long.MAX_VALUE);
+ new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() -
1));
event.increaseReferenceCount(IoTDBNonDataRegionSource.class.getName());
return event;