This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6639d5742b7 Pipe: Improved hybrid algorithm and avoid the caculations 
of tsfiles in connector pending queue tsfiles to block the forwarding of 
realtime requests (#12668)
6639d5742b7 is described below

commit 6639d5742b796ec49fbc3d09e4527b2546e12938
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 5 20:54:02 2024 +0800

    Pipe: Improved hybrid algorithm and avoid the caculations of tsfiles in 
connector pending queue tsfiles to block the forwarding of realtime requests 
(#12668)
---
 .../event/common/heartbeat/PipeHeartbeatEvent.java | 48 ++++++++--------------
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  3 --
 .../PipeRealtimeDataRegionHybridExtractor.java     |  9 +---
 3 files changed, 18 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index c3d0798c7e9..9738f86a5dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -41,7 +39,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private final String dataRegionId;
   private String pipeName;
-  private PipeRealtimeDataRegionExtractor extractor = null;
 
   private long timePublished;
   private long timeAssigned;
@@ -62,18 +59,18 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private final boolean shouldPrintMessage;
 
-  public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
+  public PipeHeartbeatEvent(final String dataRegionId, final boolean 
shouldPrintMessage) {
     super(null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.shouldPrintMessage = shouldPrintMessage;
   }
 
   public PipeHeartbeatEvent(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      String dataRegionId,
-      long timePublished,
-      boolean shouldPrintMessage) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final String dataRegionId,
+      final long timePublished,
+      final boolean shouldPrintMessage) {
     super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
     this.timePublished = timePublished;
@@ -81,12 +78,12 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   }
 
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     return true;
   }
 
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     // PipeName == null indicates that the event is the raw event at disruptor,
     // not the event copied and passed to the extractor
     if (shouldPrintMessage && pipeName != null && LOGGER.isDebugEnabled()) {
@@ -102,11 +99,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     // Should record PipeTaskMeta, for sometimes HeartbeatEvents should report 
exceptions.
     // Here we ignore parameters `pattern`, `startTime`, and `endTime`.
     return new PipeHeartbeatEvent(
@@ -131,7 +128,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   /////////////////////////////// Delay Reporting 
///////////////////////////////
 
-  public void bindPipeName(String pipeName) {
+  public void bindPipeName(final String pipeName) {
     if (shouldPrintMessage) {
       this.pipeName = pipeName;
     }
@@ -175,13 +172,13 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   /////////////////////////////// Queue size Reporting 
///////////////////////////////
 
-  public void recordDisruptorSize(RingBuffer<?> ringBuffer) {
+  public void recordDisruptorSize(final RingBuffer<?> ringBuffer) {
     if (shouldPrintMessage) {
       disruptorSize = ringBuffer.getBufferSize() - (int) 
ringBuffer.remainingCapacity();
     }
   }
 
-  public void recordExtractorQueueSize(UnboundedBlockingPendingQueue<Event> 
pendingQueue) {
+  public void recordExtractorQueueSize(final 
UnboundedBlockingPendingQueue<Event> pendingQueue) {
     if (shouldPrintMessage) {
       extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
       extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
@@ -189,23 +186,12 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     }
   }
 
-  public void recordConnectorQueueSize(UnboundedBlockingPendingQueue<Event> 
pendingQueue) {
+  public void recordConnectorQueueSize(final 
UnboundedBlockingPendingQueue<Event> pendingQueue) {
     if (shouldPrintMessage) {
       connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
       connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
       connectorQueueSize = pendingQueue.size();
     }
-
-    if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
-      ((PipeRealtimeDataRegionHybridExtractor) extractor)
-          
.informConnectorInputPendingQueueTsFileSize(pendingQueue.getTsFileInsertionEventCount());
-    }
-  }
-
-  /////////////////////////////// For Hybrid extractor 
///////////////////////////////
-
-  public void bindExtractor(PipeRealtimeDataRegionExtractor extractor) {
-    this.extractor = extractor;
   }
 
   /////////////////////////////// For Commit Ordering 
///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index d809bd72a81..5415cb182e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -296,9 +296,6 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   protected abstract void doExtract(final PipeRealtimeEvent event);
 
   protected void extractHeartbeat(final PipeRealtimeEvent event) {
-    // Bind extractor so that the heartbeat event can later inform the 
extractor of queue size
-    ((PipeHeartbeatEvent) event.getEvent()).bindExtractor(this);
-
     // Record the pending queue size before trying to put heartbeatEvent into 
queue
     ((PipeHeartbeatEvent) 
event.getEvent()).recordExtractorQueueSize(pendingQueue);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 381b8d73858..a0f075298a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -40,15 +40,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
-  private final AtomicInteger connectorInputPendingQueueTsFileSize = new 
AtomicInteger(0);
-
   @Override
   protected void doExtract(final PipeRealtimeEvent event) {
     final Event eventToExtract = event.getEvent();
@@ -238,7 +235,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private boolean isRealtimeTsFileEventCountExceededLimit() {
-    return pendingQueue.getTsFileInsertionEventCount() + 
connectorInputPendingQueueTsFileSize.get()
+    return pendingQueue.getTsFileInsertionEventCount()
         >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
   }
 
@@ -247,10 +244,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
   }
 
-  public void informConnectorInputPendingQueueTsFileSize(final int queueSize) {
-    connectorInputPendingQueueTsFileSize.set(queueSize);
-  }
-
   @Override
   public Event supply() {
     PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();

Reply via email to