This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 93de59e4db0 Pipe: add connector's pending tsfile event count to help 
isTsFileEventCountInQueueExceededLimit judgement when hybrid mode is enabled 
(#11476)
93de59e4db0 is described below

commit 93de59e4db01681a49ff0c8e57e9de92c0707816
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Nov 4 14:21:48 2023 +0800

    Pipe: add connector's pending tsfile event count to help 
isTsFileEventCountInQueueExceededLimit judgement when hybrid mode is enabled 
(#11476)
---
 .../pipe/event/common/heartbeat/PipeHeartbeatEvent.java   |  7 ++++++-
 .../realtime/PipeRealtimeDataRegionHybridExtractor.java   | 15 +++++++++++----
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index b5e92f69c55..2c54d93401a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -185,7 +185,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
     if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
       ((PipeRealtimeDataRegionHybridExtractor) extractor)
-          
.informEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount());
+          
.informProcessorEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount());
     }
   }
 
@@ -195,6 +195,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
       connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
       connectorQueueSize = pendingQueue.size();
     }
+
+    if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
+      ((PipeRealtimeDataRegionHybridExtractor) extractor)
+          
.informConnectorInputPendingQueueTsFileSize(pendingQueue.getTsFileInsertionEventCount());
+    }
   }
 
   /////////////////////////////// For Hybrid extractor 
///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 7b6e5e42dcf..3ba5a4c496f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -42,7 +42,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
   private volatile boolean isStartedToSupply = false;
-  private final AtomicInteger eventCollectorQueueTsFileSize = new 
AtomicInteger(0);
+  private final AtomicInteger processorEventCollectorQueueTsFileSize = new 
AtomicInteger(0);
+  private final AtomicInteger connectorInputPendingQueueTsFileSize = new 
AtomicInteger(0);
 
   @Override
   protected void doExtract(PipeRealtimeEvent event) {
@@ -228,12 +229,18 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean isTsFileEventCountInQueueExceededLimit() {
-    return pendingQueue.getTsFileInsertionEventCount() + 
eventCollectorQueueTsFileSize.get()
+    return pendingQueue.getTsFileInsertionEventCount()
+            + processorEventCollectorQueueTsFileSize.get()
+            + connectorInputPendingQueueTsFileSize.get()
         >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
   }
 
-  public void informEventCollectorQueueTsFileSize(int queueSize) {
-    eventCollectorQueueTsFileSize.set(queueSize);
+  public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
+    processorEventCollectorQueueTsFileSize.set(queueSize);
+  }
+
+  public void informConnectorInputPendingQueueTsFileSize(int queueSize) {
+    connectorInputPendingQueueTsFileSize.set(queueSize);
   }
 
   @Override

Reply via email to