This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 22127335c95 Pipe: Optimized the memory usage of progress report event
(#16197)
22127335c95 is described below
commit 22127335c9512e9a71bdfb55ced121a8c4c8603e
Author: Caideyipi <[email protected]>
AuthorDate: Mon Aug 18 20:00:27 2025 +0800
Pipe: Optimized the memory usage of progress report event (#16197)
* may_complete
* simplify
---
...istoricalDataRegionTsFileAndDeletionSource.java | 11 +-------
.../realtime/PipeRealtimeDataRegionSource.java | 14 ++++++++-
.../realtime/assigner/PipeDataRegionAssigner.java | 16 ++---------
.../commons/pipe/event/ProgressReportEvent.java | 33 ++++++----------------
.../pipe/source/IoTDBNonDataRegionSource.java | 11 +-------
5 files changed, 25 insertions(+), 60 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 2678c18d51e..1f5ded9fb5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -814,16 +814,7 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
private Event supplyTsFileEvent(final TsFileResource resource) {
if (!filteredTsFileResources.contains(resource)) {
final ProgressReportEvent progressReportEvent =
- new ProgressReportEvent(
- pipeName,
- creationTime,
- pipeTaskMeta,
- treePattern,
- tablePattern,
- userName,
- skipIfNoPrivileges,
- historicalDataExtractionStartTime,
- historicalDataExtractionEndTime);
+ new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
final boolean isReferenceCountIncreased =
progressReportEvent.increaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index a5e3284ef49..ce99a182e0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -336,7 +336,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
public final void extract(final PipeRealtimeEvent event) {
// The progress report event shall be directly extracted
if (event.getEvent() instanceof ProgressReportEvent) {
- extractDirectly(event);
+ extractProgressReportEvent(event);
return;
}
@@ -422,6 +422,18 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
}
}
+ protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
+ if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
+ final ProgressReportEvent oldEvent = (ProgressReportEvent)
pendingQueue.peekLast();
+ oldEvent.bindProgressIndex(
+ oldEvent
+ .getProgressIndex()
+
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
+ return;
+ }
+ extractDirectly(event);
+ }
+
protected void extractDirectly(final PipeRealtimeEvent event) {
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 33db2715a03..86f057639cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -150,13 +150,7 @@ public class PipeDataRegionAssigner implements Closeable {
new ProgressReportEvent(
extractor.getPipeName(),
extractor.getCreationTime(),
- extractor.getPipeTaskMeta(),
- extractor.getTreePattern(),
- extractor.getTablePattern(),
- extractor.getUserName(),
- extractor.isSkipIfNoPrivileges(),
- extractor.getRealtimeDataExtractionStartTime(),
- extractor.getRealtimeDataExtractionEndTime());
+ extractor.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
@@ -239,13 +233,7 @@ public class PipeDataRegionAssigner implements Closeable {
new ProgressReportEvent(
extractor.getPipeName(),
extractor.getCreationTime(),
- extractor.getPipeTaskMeta(),
- extractor.getTreePattern(),
- extractor.getTablePattern(),
- extractor.getUserName(),
- extractor.isSkipIfNoPrivileges(),
- extractor.getRealtimeDataExtractionStartTime(),
- extractor.getRealtimeDataExtractionEndTime());
+ extractor.getPipeTaskMeta());
reportEvent.bindProgressIndex(event.getProgressIndex());
if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 536c22ae989..3b5d4060359 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -34,25 +34,17 @@ public class ProgressReportEvent extends EnrichedEvent {
private ProgressIndex progressIndex;
public ProgressReportEvent(
- final String pipeName,
- final long creationTime,
- final PipeTaskMeta pipeTaskMeta,
- final TreePattern treePattern,
- final TablePattern tablePattern,
- final String userName,
- final boolean skipIfNoPrivileges,
- final long startTime,
- final long endTime) {
+ final String pipeName, final long creationTime, final PipeTaskMeta
pipeTaskMeta) {
super(
pipeName,
creationTime,
pipeTaskMeta,
- treePattern,
- tablePattern,
- userName,
- skipIfNoPrivileges,
- startTime,
- endTime);
+ null,
+ null,
+ null,
+ false,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
}
@Override
@@ -86,16 +78,7 @@ public class ProgressReportEvent extends EnrichedEvent {
final boolean skipIfNoPrivileges,
final long startTime,
final long endTime) {
- return new ProgressReportEvent(
- pipeName,
- creationTime,
- pipeTaskMeta,
- treePattern,
- tablePattern,
- userName,
- skipIfNoPrivileges,
- startTime,
- endTime);
+ return new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 497b461dced..c59cb998ca3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -226,16 +226,7 @@ public abstract class IoTDBNonDataRegionSource extends
IoTDBSource {
|| !isTypeListened(realtimeEvent)
|| (!isForwardingPipeRequests && realtimeEvent.isGeneratedByPipe())) {
final ProgressReportEvent event =
- new ProgressReportEvent(
- pipeName,
- creationTime,
- pipeTaskMeta,
- treePattern,
- tablePattern,
- userName,
- skipIfNoPrivileges,
- Long.MIN_VALUE,
- Long.MAX_VALUE);
+ new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
if (shouldBindIndex) {
event.bindProgressIndex(new MetaProgressIndex(iterator.getNextIndex()
- 1));
}