This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new d2071b5eed7 [IOTDB-6135] Pipe: Fix a bug which keeps generating 
PipeHeartbeatEvent unnecessarily (#11008) (#11028)
d2071b5eed7 is described below

commit d2071b5eed76b966b632d5e300b217ffcbec4928
Author: 马子坤 <[email protected]>
AuthorDate: Mon Sep 4 12:09:19 2023 +0800

    [IOTDB-6135] Pipe: Fix a bug which keeps generating PipeHeartbeatEvent 
unnecessarily (#11008) (#11028)
    
    (cherry picked from commit 35ffb5128db06d10ed1715199b217c217c60a42c)
---
 .../extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java     | 4 +++-
 .../pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java   | 4 +++-
 .../extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java     | 4 +++-
 3 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index edbc14e26d4..cbd6d77f189 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -155,7 +155,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
-    if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+    Event lastEvent = pendingQueue.peekLast();
+    if (lastEvent instanceof PipeRealtimeEvent
+        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent) {
       // if the last event in the pending queue is a heartbeat event, we 
should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index fb3783dc224..52970f2957d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
-    if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+    Event lastEvent = pendingQueue.peekLast();
+    if (lastEvent instanceof PipeRealtimeEvent
+        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent) {
       // if the last event in the pending queue is a heartbeat event, we 
should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 5967b1bc87b..816e0249b47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
-    if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) {
+    Event lastEvent = pendingQueue.peekLast();
+    if (lastEvent instanceof PipeRealtimeEvent
+        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent) {
       // if the last event in the pending queue is a heartbeat event, we 
should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());

Reply via email to