This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 100804989b7 Pipe: Report queue size in PipeHeartbeatEvent (#10997)
(#11035)
100804989b7 is described below
commit 100804989b7f25380d3aa62e4d1e73aa2c4f517a
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 4 18:02:58 2023 +0800
Pipe: Report queue size in PipeHeartbeatEvent (#10997) (#11035)
(cherry picked from commit eb43b4909a7df36299b2a9b9e4a95f2fd83ea5d6)
---
.../event/common/heartbeat/PipeHeartbeatEvent.java | 89 +++++++++++++++++++---
.../PipeRealtimeDataRegionHybridExtractor.java | 10 ++-
.../PipeRealtimeDataRegionLogExtractor.java | 10 ++-
.../PipeRealtimeDataRegionTsFileExtractor.java | 10 ++-
.../realtime/assigner/DisruptorQueue.java | 6 ++
.../pipe/task/connection/PipeEventCollector.java | 4 +
.../connection/UnboundedBlockingPendingQueue.java | 4 +
7 files changed, 117 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 71d65a6b7cd..15f57aa94cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -23,11 +23,17 @@ import
org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.pipe.api.event.Event;
+import com.lmax.disruptor.RingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Deque;
+
public class PipeHeartbeatEvent extends EnrichedEvent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHeartbeatEvent.class);
@@ -40,6 +46,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private long timeProcessed;
private long timeTransferred;
+ private int disruptorSize;
+ private int extractorQueueSize;
+ private int bufferQueueSize;
+ private int connectorQueueSize;
+
private final boolean shouldPrintMessage;
public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
@@ -86,40 +97,90 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
return false;
}
+ /////////////////////////////// Whether to print
///////////////////////////////
+
+ public boolean isShouldPrintMessage() {
+ return shouldPrintMessage;
+ }
+
/////////////////////////////// Delay Reporting
///////////////////////////////
public void bindPipeName(String pipeName) {
- this.pipeName = pipeName;
+ if (shouldPrintMessage) {
+ this.pipeName = pipeName;
+ }
}
public void onPublished() {
- timePublished = System.currentTimeMillis();
+ if (shouldPrintMessage) {
+ timePublished = System.currentTimeMillis();
+ }
}
public void onAssigned() {
- timeAssigned = System.currentTimeMillis();
+ if (shouldPrintMessage) {
+ timeAssigned = System.currentTimeMillis();
+ }
}
public void onProcessed() {
- timeProcessed = System.currentTimeMillis();
+ if (shouldPrintMessage) {
+ timeProcessed = System.currentTimeMillis();
+ }
}
public void onTransferred() {
- timeTransferred = System.currentTimeMillis();
+ if (shouldPrintMessage) {
+ timeTransferred = System.currentTimeMillis();
+ }
+ }
+
+ /////////////////////////////// Queue size Reporting
///////////////////////////////
+
+ public void recordDisruptorSize(RingBuffer<?> ringBuffer) {
+ if (shouldPrintMessage) {
+ disruptorSize = ringBuffer.getBufferSize() - (int)
ringBuffer.remainingCapacity();
+ }
+ }
+
+ public void recordExtractorQueueSize(UnboundedBlockingPendingQueue<Event>
pendingQueue) {
+ if (shouldPrintMessage) {
+ extractorQueueSize = pendingQueue.size();
+ }
+ }
+
+ public void recordBufferQueueSize(Deque<Event> bufferQueue) {
+ if (shouldPrintMessage) {
+ bufferQueueSize = bufferQueue.size();
+ }
+ }
+
+ public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event>
pendingQueue) {
+ if (shouldPrintMessage) {
+ connectorQueueSize = pendingQueue.size();
+ }
}
@Override
public String toString() {
- final String errorMessage = "error";
+ final String unknownMessage = "Unknown";
final String publishedToAssignedMessage =
- timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" :
errorMessage;
+ timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" :
unknownMessage;
final String assignedToProcessedMessage =
- timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" :
errorMessage;
+ timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" :
unknownMessage;
final String processedToTransferredMessage =
- timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" :
errorMessage;
+ timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" :
unknownMessage;
final String totalTimeMessage =
- timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" :
errorMessage;
+ timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" :
unknownMessage;
+
+ final String disruptorSizeMessage = Integer.toString(disruptorSize);
+ final String extractorQueueSizeMessage =
+ timeAssigned != 0 ? Integer.toString(extractorQueueSize) :
unknownMessage;
+ final String bufferQueueSizeMessage =
+ timeProcessed != 0 ? Integer.toString(bufferQueueSize) :
unknownMessage;
+ final String connectorQueueSizeMessage =
+ timeProcessed != 0 ? Integer.toString(connectorQueueSize) :
unknownMessage;
return "PipeHeartbeatEvent{"
+ "pipeName='"
@@ -136,6 +197,14 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
+ processedToTransferredMessage
+ ", totalTimeCost="
+ totalTimeMessage
+ + ", disruptorSize="
+ + disruptorSizeMessage
+ + ", extractorQueueSize="
+ + extractorQueueSizeMessage
+ + ", bufferQueueSize="
+ + bufferQueueSizeMessage
+ + ", connectorQueueSize="
+ + connectorQueueSizeMessage
+ "}";
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index cbd6d77f189..5728c30a0d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -155,11 +155,17 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private void extractHeartbeat(PipeRealtimeEvent event) {
+ // Record the pending queue size before trying to put heartbeatEvent into
queue
+ ((PipeHeartbeatEvent)
event.getEvent()).recordExtractorQueueSize(pendingQueue);
+
Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
- && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent) {
- // if the last event in the pending queue is a heartbeat event, we
should not extract any more
+ && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent
+ && (((PipeHeartbeatEvent) ((PipeRealtimeEvent)
lastEvent).getEvent()).isShouldPrintMessage()
+ || !((PipeHeartbeatEvent)
event.getEvent()).isShouldPrintMessage())) {
+ // If the last event in the pending queue is a heartbeat event, we
should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
+ // Besides, the printable event has higher priority to stay in queue to
enable metrics report.
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 52970f2957d..4b5057032ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -75,11 +75,17 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
}
private void extractHeartbeat(PipeRealtimeEvent event) {
+ // Record the pending queue size before trying to put heartbeatEvent into
queue
+ ((PipeHeartbeatEvent)
event.getEvent()).recordExtractorQueueSize(pendingQueue);
+
Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
- && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent) {
- // if the last event in the pending queue is a heartbeat event, we
should not extract any more
+ && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent
+ && (((PipeHeartbeatEvent) ((PipeRealtimeEvent)
lastEvent).getEvent()).isShouldPrintMessage()
+ || !((PipeHeartbeatEvent)
event.getEvent()).isShouldPrintMessage())) {
+ // If the last event in the pending queue is a heartbeat event, we
should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
+ // Besides, the printable event has higher priority to stay in queue to
enable metrics report.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 816e0249b47..ee0067837b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -75,11 +75,17 @@ public class PipeRealtimeDataRegionTsFileExtractor extends
PipeRealtimeDataRegio
}
private void extractHeartbeat(PipeRealtimeEvent event) {
+ // Record the pending queue size before trying to put heartbeatEvent into
queue
+ ((PipeHeartbeatEvent)
event.getEvent()).recordExtractorQueueSize(pendingQueue);
+
Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
- && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent) {
- // if the last event in the pending queue is a heartbeat event, we
should not extract any more
+ && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent
+ && (((PipeHeartbeatEvent) ((PipeRealtimeEvent)
lastEvent).getEvent()).isShouldPrintMessage()
+ || !((PipeHeartbeatEvent)
event.getEvent()).isShouldPrintMessage())) {
+ // If the last event in the pending queue is a heartbeat event, we
should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
+ // Besides, the printable event has higher priority to stay in queue to
enable metrics report.
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index b7823171da7..6113045ae04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import com.lmax.disruptor.BlockingWaitStrategy;
@@ -56,6 +58,10 @@ public class DisruptorQueue {
}
public void publish(PipeRealtimeEvent event) {
+ EnrichedEvent internalEvent = event.getEvent();
+ if (internalEvent instanceof PipeHeartbeatEvent) {
+ ((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
+ }
ringBuffer.publishEvent((container, sequence, o) ->
container.setEvent(event), event);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 772851305ee..360ee2b7423 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -43,6 +43,10 @@ public class PipeEventCollector implements EventCollector {
if (event instanceof EnrichedEvent) {
((EnrichedEvent)
event).increaseReferenceCount(PipeEventCollector.class.getName());
}
+ if (event instanceof PipeHeartbeatEvent) {
+ ((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
+ ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
+ }
while (!bufferQueue.isEmpty()) {
final Event bufferedEvent = bufferQueue.peek();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
index 343621bbb4a..b3bdd7b9324 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -36,4 +36,8 @@ public class UnboundedBlockingPendingQueue<E extends Event>
extends BlockingPend
public E peekLast() {
return pendingDeque.peekLast();
}
+
+ public E removeLast() {
+ return pendingDeque.removeLast();
+ }
}