This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 93de59e4db0 Pipe: add connector's pending tsfile event count to help
isTsFileEventCountInQueueExceededLimit judgement when hybrid mode is enabled
(#11476)
93de59e4db0 is described below
commit 93de59e4db01681a49ff0c8e57e9de92c0707816
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Nov 4 14:21:48 2023 +0800
Pipe: add connector's pending tsfile event count to help
isTsFileEventCountInQueueExceededLimit judgement when hybrid mode is enabled
(#11476)
---
.../pipe/event/common/heartbeat/PipeHeartbeatEvent.java | 7 ++++++-
.../realtime/PipeRealtimeDataRegionHybridExtractor.java | 15 +++++++++++----
2 files changed, 17 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index b5e92f69c55..2c54d93401a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -185,7 +185,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
((PipeRealtimeDataRegionHybridExtractor) extractor)
-
.informEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount());
+
.informProcessorEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount());
}
}
@@ -195,6 +195,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
connectorQueueSize = pendingQueue.size();
}
+
+ if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
+ ((PipeRealtimeDataRegionHybridExtractor) extractor)
+
.informConnectorInputPendingQueueTsFileSize(pendingQueue.getTsFileInsertionEventCount());
+ }
}
/////////////////////////////// For Hybrid extractor
///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 7b6e5e42dcf..3ba5a4c496f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -42,7 +42,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
private volatile boolean isStartedToSupply = false;
- private final AtomicInteger eventCollectorQueueTsFileSize = new
AtomicInteger(0);
+ private final AtomicInteger processorEventCollectorQueueTsFileSize = new
AtomicInteger(0);
+ private final AtomicInteger connectorInputPendingQueueTsFileSize = new
AtomicInteger(0);
@Override
protected void doExtract(PipeRealtimeEvent event) {
@@ -228,12 +229,18 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean isTsFileEventCountInQueueExceededLimit() {
- return pendingQueue.getTsFileInsertionEventCount() +
eventCollectorQueueTsFileSize.get()
+ return pendingQueue.getTsFileInsertionEventCount()
+ + processorEventCollectorQueueTsFileSize.get()
+ + connectorInputPendingQueueTsFileSize.get()
>=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
}
- public void informEventCollectorQueueTsFileSize(int queueSize) {
- eventCollectorQueueTsFileSize.set(queueSize);
+ public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
+ processorEventCollectorQueueTsFileSize.set(queueSize);
+ }
+
+ public void informConnectorInputPendingQueueTsFileSize(int queueSize) {
+ connectorInputPendingQueueTsFileSize.set(queueSize);
}
@Override