This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 35039c5fcc3 [To dev/1.3] Pipe: Optimized the memory usage of progress 
report event (#16197) (#16200)
35039c5fcc3 is described below

commit 35039c5fcc37e5a35de9001d5fd7fc2ec3622169
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 19 09:41:24 2025 +0800

    [To dev/1.3] Pipe: Optimized the memory usage of progress report event 
(#16197) (#16200)
    
    * Pipe: Optimized the memory usage of progress report event (#16197)
    
    * may_complete
    
    * simplify
    
    * Update PipeHistoricalDataRegionTsFileSource.java
---
 .../historical/PipeHistoricalDataRegionTsFileSource.java   |  8 +-------
 .../dataregion/realtime/PipeRealtimeDataRegionSource.java  | 14 +++++++++++++-
 .../realtime/assigner/PipeDataRegionAssigner.java          | 10 ++--------
 .../iotdb/commons/pipe/event/ProgressReportEvent.java      | 12 +++---------
 .../commons/pipe/source/IoTDBNonDataRegionSource.java      |  3 +--
 5 files changed, 20 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index d4d44d8f591..e3278977647 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -561,13 +561,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
 
     if (!filteredTsFileResources.contains(resource)) {
       final ProgressReportEvent progressReportEvent =
-          new ProgressReportEvent(
-              pipeName,
-              creationTime,
-              pipeTaskMeta,
-              pipePattern,
-              historicalDataExtractionStartTime,
-              historicalDataExtractionEndTime);
+          new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
       progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
       final boolean isReferenceCountIncreased =
           progressReportEvent.increaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index b41000a3554..4f30452fb8b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -295,7 +295,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   public final void extract(final PipeRealtimeEvent event) {
     // The progress report event shall be directly extracted
     if (event.getEvent() instanceof ProgressReportEvent) {
-      extractDirectly(event);
+      extractProgressReportEvent(event);
       return;
     }
 
@@ -381,6 +381,18 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     }
   }
 
+  protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+    if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
+      final ProgressReportEvent oldEvent = (ProgressReportEvent) 
pendingQueue.peekLast();
+      oldEvent.bindProgressIndex(
+          oldEvent
+              .getProgressIndex()
+              
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+      return;
+    }
+    extractDirectly(event);
+  }
+
   protected void extractDirectly(final PipeRealtimeEvent event) {
     if (!pendingQueue.waitedOffer(event)) {
       // This would not happen, but just in case.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 2e478b6ff35..6a4f877bf47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -127,10 +127,7 @@ public class PipeDataRegionAssigner implements Closeable {
                     new ProgressReportEvent(
                         extractor.getPipeName(),
                         extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta(),
-                        extractor.getPipePattern(),
-                        extractor.getRealtimeDataExtractionStartTime(),
-                        extractor.getRealtimeDataExtractionEndTime());
+                        extractor.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
@@ -182,10 +179,7 @@ public class PipeDataRegionAssigner implements Closeable {
                     new ProgressReportEvent(
                         extractor.getPipeName(),
                         extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta(),
-                        extractor.getPipePattern(),
-                        extractor.getRealtimeDataExtractionStartTime(),
-                        extractor.getRealtimeDataExtractionEndTime());
+                        extractor.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index d87a5196b72..2c89f88709a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -33,13 +33,8 @@ public class ProgressReportEvent extends EnrichedEvent {
   private ProgressIndex progressIndex;
 
   public ProgressReportEvent(
-      final String pipeName,
-      final long creationTime,
-      final PipeTaskMeta pipeTaskMeta,
-      final PipePattern pipePattern,
-      final long startTime,
-      final long endTime) {
-    super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, 
endTime);
+      final String pipeName, final long creationTime, final PipeTaskMeta 
pipeTaskMeta) {
+    super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
   }
 
   @Override
@@ -70,8 +65,7 @@ public class ProgressReportEvent extends EnrichedEvent {
       final PipePattern pattern,
       final long startTime,
       final long endTime) {
-    return new ProgressReportEvent(
-        pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
+    return new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 6c5b1a246d3..1639c9e759f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -189,8 +189,7 @@ public abstract class IoTDBNonDataRegionSource extends 
IoTDBSource {
         || !isTypeListened(realtimeEvent)
         || (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
       final ProgressReportEvent event =
-          new ProgressReportEvent(
-              pipeName, creationTime, pipeTaskMeta, pipePattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
+          new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
       event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() - 
1));
       event.increaseReferenceCount(IoTDBNonDataRegionSource.class.getName());
       return event;

Reply via email to