This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 22127335c95 Pipe: Optimized the memory usage of progress report event 
(#16197)
22127335c95 is described below

commit 22127335c9512e9a71bdfb55ced121a8c4c8603e
Author: Caideyipi <[email protected]>
AuthorDate: Mon Aug 18 20:00:27 2025 +0800

    Pipe: Optimized the memory usage of progress report event (#16197)
    
    * may_complete
    
    * simplify
---
 ...istoricalDataRegionTsFileAndDeletionSource.java | 11 +-------
 .../realtime/PipeRealtimeDataRegionSource.java     | 14 ++++++++-
 .../realtime/assigner/PipeDataRegionAssigner.java  | 16 ++---------
 .../commons/pipe/event/ProgressReportEvent.java    | 33 ++++++----------------
 .../pipe/source/IoTDBNonDataRegionSource.java      | 11 +-------
 5 files changed, 25 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 2678c18d51e..1f5ded9fb5f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -814,16 +814,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
   private Event supplyTsFileEvent(final TsFileResource resource) {
     if (!filteredTsFileResources.contains(resource)) {
       final ProgressReportEvent progressReportEvent =
-          new ProgressReportEvent(
-              pipeName,
-              creationTime,
-              pipeTaskMeta,
-              treePattern,
-              tablePattern,
-              userName,
-              skipIfNoPrivileges,
-              historicalDataExtractionStartTime,
-              historicalDataExtractionEndTime);
+          new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
       progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
       final boolean isReferenceCountIncreased =
           progressReportEvent.increaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index a5e3284ef49..ce99a182e0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -336,7 +336,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   public final void extract(final PipeRealtimeEvent event) {
     // The progress report event shall be directly extracted
     if (event.getEvent() instanceof ProgressReportEvent) {
-      extractDirectly(event);
+      extractProgressReportEvent(event);
       return;
     }
 
@@ -422,6 +422,18 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     }
   }
 
+  protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+    if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
+      final ProgressReportEvent oldEvent = (ProgressReportEvent) 
pendingQueue.peekLast();
+      oldEvent.bindProgressIndex(
+          oldEvent
+              .getProgressIndex()
+              
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+      return;
+    }
+    extractDirectly(event);
+  }
+
   protected void extractDirectly(final PipeRealtimeEvent event) {
     if (!pendingQueue.waitedOffer(event)) {
       // This would not happen, but just in case.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 33db2715a03..86f057639cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -150,13 +150,7 @@ public class PipeDataRegionAssigner implements Closeable {
                     new ProgressReportEvent(
                         extractor.getPipeName(),
                         extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta(),
-                        extractor.getTreePattern(),
-                        extractor.getTablePattern(),
-                        extractor.getUserName(),
-                        extractor.isSkipIfNoPrivileges(),
-                        extractor.getRealtimeDataExtractionStartTime(),
-                        extractor.getRealtimeDataExtractionEndTime());
+                        extractor.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
@@ -239,13 +233,7 @@ public class PipeDataRegionAssigner implements Closeable {
                     new ProgressReportEvent(
                         extractor.getPipeName(),
                         extractor.getCreationTime(),
-                        extractor.getPipeTaskMeta(),
-                        extractor.getTreePattern(),
-                        extractor.getTablePattern(),
-                        extractor.getUserName(),
-                        extractor.isSkipIfNoPrivileges(),
-                        extractor.getRealtimeDataExtractionStartTime(),
-                        extractor.getRealtimeDataExtractionEndTime());
+                        extractor.getPipeTaskMeta());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
                 if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
                   LOGGER.warn(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 536c22ae989..3b5d4060359 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -34,25 +34,17 @@ public class ProgressReportEvent extends EnrichedEvent {
   private ProgressIndex progressIndex;
 
   public ProgressReportEvent(
-      final String pipeName,
-      final long creationTime,
-      final PipeTaskMeta pipeTaskMeta,
-      final TreePattern treePattern,
-      final TablePattern tablePattern,
-      final String userName,
-      final boolean skipIfNoPrivileges,
-      final long startTime,
-      final long endTime) {
+      final String pipeName, final long creationTime, final PipeTaskMeta 
pipeTaskMeta) {
     super(
         pipeName,
         creationTime,
         pipeTaskMeta,
-        treePattern,
-        tablePattern,
-        userName,
-        skipIfNoPrivileges,
-        startTime,
-        endTime);
+        null,
+        null,
+        null,
+        false,
+        Long.MIN_VALUE,
+        Long.MAX_VALUE);
   }
 
   @Override
@@ -86,16 +78,7 @@ public class ProgressReportEvent extends EnrichedEvent {
       final boolean skipIfNoPrivileges,
       final long startTime,
       final long endTime) {
-    return new ProgressReportEvent(
-        pipeName,
-        creationTime,
-        pipeTaskMeta,
-        treePattern,
-        tablePattern,
-        userName,
-        skipIfNoPrivileges,
-        startTime,
-        endTime);
+    return new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 497b461dced..c59cb998ca3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -226,16 +226,7 @@ public abstract class IoTDBNonDataRegionSource extends 
IoTDBSource {
         || !isTypeListened(realtimeEvent)
         || (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
       final ProgressReportEvent event =
-          new ProgressReportEvent(
-              pipeName,
-              creationTime,
-              pipeTaskMeta,
-              treePattern,
-              tablePattern,
-              userName,
-              skipIfNoPrivileges,
-              Long.MIN_VALUE,
-              Long.MAX_VALUE);
+          new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
       if (shouldBindIndex) {
         event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex() 
- 1));
       }

Reply via email to