This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eb43b4909a7 Pipe: Report queue size in PipeHeartbeatEvent (#10997)
eb43b4909a7 is described below

commit eb43b4909a7df36299b2a9b9e4a95f2fd83ea5d6
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 4 16:51:17 2023 +0800

    Pipe: Report queue size in PipeHeartbeatEvent (#10997)
---
 .../event/common/heartbeat/PipeHeartbeatEvent.java | 89 +++++++++++++++++++---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 10 ++-
 .../PipeRealtimeDataRegionLogExtractor.java        | 10 ++-
 .../PipeRealtimeDataRegionTsFileExtractor.java     | 10 ++-
 .../realtime/assigner/DisruptorQueue.java          |  6 ++
 .../pipe/task/connection/PipeEventCollector.java   |  4 +
 .../connection/UnboundedBlockingPendingQueue.java  |  4 +
 7 files changed, 117 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 71d65a6b7cd..15f57aa94cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -23,11 +23,17 @@ import 
org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.pipe.api.event.Event;
 
+import com.lmax.disruptor.RingBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Deque;
+
 public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHeartbeatEvent.class);
@@ -40,6 +46,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   private long timeProcessed;
   private long timeTransferred;
 
+  private int disruptorSize;
+  private int extractorQueueSize;
+  private int bufferQueueSize;
+  private int connectorQueueSize;
+
   private final boolean shouldPrintMessage;
 
   public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
@@ -86,40 +97,90 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     return false;
   }
 
+  /////////////////////////////// Whether to print 
///////////////////////////////
+
+  public boolean isShouldPrintMessage() {
+    return shouldPrintMessage;
+  }
+
   /////////////////////////////// Delay Reporting 
///////////////////////////////
 
   public void bindPipeName(String pipeName) {
-    this.pipeName = pipeName;
+    if (shouldPrintMessage) {
+      this.pipeName = pipeName;
+    }
   }
 
   public void onPublished() {
-    timePublished = System.currentTimeMillis();
+    if (shouldPrintMessage) {
+      timePublished = System.currentTimeMillis();
+    }
   }
 
   public void onAssigned() {
-    timeAssigned = System.currentTimeMillis();
+    if (shouldPrintMessage) {
+      timeAssigned = System.currentTimeMillis();
+    }
   }
 
   public void onProcessed() {
-    timeProcessed = System.currentTimeMillis();
+    if (shouldPrintMessage) {
+      timeProcessed = System.currentTimeMillis();
+    }
   }
 
   public void onTransferred() {
-    timeTransferred = System.currentTimeMillis();
+    if (shouldPrintMessage) {
+      timeTransferred = System.currentTimeMillis();
+    }
+  }
+
+  /////////////////////////////// Queue size Reporting 
///////////////////////////////
+
+  public void recordDisruptorSize(RingBuffer<?> ringBuffer) {
+    if (shouldPrintMessage) {
+      disruptorSize = ringBuffer.getBufferSize() - (int) 
ringBuffer.remainingCapacity();
+    }
+  }
+
+  public void recordExtractorQueueSize(UnboundedBlockingPendingQueue<Event> 
pendingQueue) {
+    if (shouldPrintMessage) {
+      extractorQueueSize = pendingQueue.size();
+    }
+  }
+
+  public void recordBufferQueueSize(Deque<Event> bufferQueue) {
+    if (shouldPrintMessage) {
+      bufferQueueSize = bufferQueue.size();
+    }
+  }
+
+  public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event> 
pendingQueue) {
+    if (shouldPrintMessage) {
+      connectorQueueSize = pendingQueue.size();
+    }
   }
 
   @Override
   public String toString() {
-    final String errorMessage = "error";
+    final String unknownMessage = "Unknown";
 
     final String publishedToAssignedMessage =
-        timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" : 
errorMessage;
+        timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" : 
unknownMessage;
     final String assignedToProcessedMessage =
-        timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" : 
errorMessage;
+        timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" : 
unknownMessage;
     final String processedToTransferredMessage =
-        timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" : 
errorMessage;
+        timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" : 
unknownMessage;
     final String totalTimeMessage =
-        timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : 
errorMessage;
+        timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : 
unknownMessage;
+
+    final String disruptorSizeMessage = Integer.toString(disruptorSize);
+    final String extractorQueueSizeMessage =
+        timeAssigned != 0 ? Integer.toString(extractorQueueSize) : 
unknownMessage;
+    final String bufferQueueSizeMessage =
+        timeProcessed != 0 ? Integer.toString(bufferQueueSize) : 
unknownMessage;
+    final String connectorQueueSizeMessage =
+        timeProcessed != 0 ? Integer.toString(connectorQueueSize) : 
unknownMessage;
 
     return "PipeHeartbeatEvent{"
         + "pipeName='"
@@ -136,6 +197,14 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
         + processedToTransferredMessage
         + ", totalTimeCost="
         + totalTimeMessage
+        + ", disruptorSize="
+        + disruptorSizeMessage
+        + ", extractorQueueSize="
+        + extractorQueueSizeMessage
+        + ", bufferQueueSize="
+        + bufferQueueSizeMessage
+        + ", connectorQueueSize="
+        + connectorQueueSizeMessage
         + "}";
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index cbd6d77f189..5728c30a0d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -155,11 +155,17 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
+    // Record the pending queue size before trying to put heartbeatEvent into 
queue
+    ((PipeHeartbeatEvent) 
event.getEvent()).recordExtractorQueueSize(pendingQueue);
+
     Event lastEvent = pendingQueue.peekLast();
     if (lastEvent instanceof PipeRealtimeEvent
-        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent) {
-      // if the last event in the pending queue is a heartbeat event, we 
should not extract any more
+        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent
+        && (((PipeHeartbeatEvent) ((PipeRealtimeEvent) 
lastEvent).getEvent()).isShouldPrintMessage()
+            || !((PipeHeartbeatEvent) 
event.getEvent()).isShouldPrintMessage())) {
+      // If the last event in the pending queue is a heartbeat event, we 
should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
+      // Besides, the printable event has higher priority to stay in queue to 
enable metrics report.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 52970f2957d..4b5057032ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -75,11 +75,17 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
+    // Record the pending queue size before trying to put heartbeatEvent into 
queue
+    ((PipeHeartbeatEvent) 
event.getEvent()).recordExtractorQueueSize(pendingQueue);
+
     Event lastEvent = pendingQueue.peekLast();
     if (lastEvent instanceof PipeRealtimeEvent
-        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent) {
-      // if the last event in the pending queue is a heartbeat event, we 
should not extract any more
+        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent
+        && (((PipeHeartbeatEvent) ((PipeRealtimeEvent) 
lastEvent).getEvent()).isShouldPrintMessage()
+            || !((PipeHeartbeatEvent) 
event.getEvent()).isShouldPrintMessage())) {
+      // If the last event in the pending queue is a heartbeat event, we 
should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
+      // Besides, the printable event has higher priority to stay in queue to 
enable metrics report.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 816e0249b47..ee0067837b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -75,11 +75,17 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
+    // Record the pending queue size before trying to put heartbeatEvent into 
queue
+    ((PipeHeartbeatEvent) 
event.getEvent()).recordExtractorQueueSize(pendingQueue);
+
     Event lastEvent = pendingQueue.peekLast();
     if (lastEvent instanceof PipeRealtimeEvent
-        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent) {
-      // if the last event in the pending queue is a heartbeat event, we 
should not extract any more
+        && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent
+        && (((PipeHeartbeatEvent) ((PipeRealtimeEvent) 
lastEvent).getEvent()).isShouldPrintMessage()
+            || !((PipeHeartbeatEvent) 
event.getEvent()).isShouldPrintMessage())) {
+      // If the last event in the pending queue is a heartbeat event, we 
should not extract any more
       // heartbeat events to avoid OOM when the pipe is stopped.
+      // Besides, the printable event has higher priority to stay in queue to 
enable metrics report.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index b7823171da7..6113045ae04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
 
 import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
@@ -56,6 +58,10 @@ public class DisruptorQueue {
   }
 
   public void publish(PipeRealtimeEvent event) {
+    EnrichedEvent internalEvent = event.getEvent();
+    if (internalEvent instanceof PipeHeartbeatEvent) {
+      ((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
+    }
     ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 772851305ee..360ee2b7423 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -43,6 +43,10 @@ public class PipeEventCollector implements EventCollector {
     if (event instanceof EnrichedEvent) {
       ((EnrichedEvent) 
event).increaseReferenceCount(PipeEventCollector.class.getName());
     }
+    if (event instanceof PipeHeartbeatEvent) {
+      ((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
+      ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
+    }
 
     while (!bufferQueue.isEmpty()) {
       final Event bufferedEvent = bufferQueue.peek();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
index 343621bbb4a..b3bdd7b9324 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -36,4 +36,8 @@ public class UnboundedBlockingPendingQueue<E extends Event> 
extends BlockingPend
   public E peekLast() {
     return pendingDeque.peekLast();
   }
+
+  public E removeLast() {
+    return pendingDeque.removeLast();
+  }
 }

Reply via email to