This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new d2071b5eed7 [IOTDB-6135] Pipe: Fix a bug which keeps generating
PipeHeartbeatEvent unnecessarily (#11008) (#11028)
d2071b5eed7 is described below
commit d2071b5eed76b966b632d5e300b217ffcbec4928
Author: 马子坤 <[email protected]>
AuthorDate: Mon Sep 4 12:09:19 2023 +0800
[IOTDB-6135] Pipe: Fix a bug which keeps generating PipeHeartbeatEvent
unnecessarily (#11008) (#11028)
(cherry picked from commit 35ffb5128db06d10ed1715199b217c217c60a42c)
---
.../extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java | 4 +++-
.../pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java | 4 +++-
.../extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java | 4 +++-
3 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index edbc14e26d4..cbd6d77f189 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -155,7 +155,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private void extractHeartbeat(PipeRealtimeEvent event) {
- if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+ Event lastEvent = pendingQueue.peekLast();
+ if (lastEvent instanceof PipeRealtimeEvent
+ && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we
should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index fb3783dc224..52970f2957d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
}
private void extractHeartbeat(PipeRealtimeEvent event) {
- if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+ Event lastEvent = pendingQueue.peekLast();
+ if (lastEvent instanceof PipeRealtimeEvent
+ && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we
should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 5967b1bc87b..816e0249b47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends
PipeRealtimeDataRegio
}
private void extractHeartbeat(PipeRealtimeEvent event) {
- if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+ Event lastEvent = pendingQueue.peekLast();
+ if (lastEvent instanceof PipeRealtimeEvent
+ && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we
should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());