This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dce69e7f3d6 Subscription: register metrics to count the number of pipe
events in sink queue and prefetching queue in subscription task (#13575)
dce69e7f3d6 is described below
commit dce69e7f3d661be787ca867f15d88d67750a7db8
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Sep 25 15:47:16 2024 +0800
Subscription: register metrics to count the number of pipe events in sink
queue and prefetching queue in subscription task (#13575)
---
.../subscription/agent/SubscriptionBrokerAgent.java | 10 ++++++++++
.../db/subscription/broker/SubscriptionBroker.java | 20 ++++++++++++++++++++
.../broker/SubscriptionPrefetchingQueue.java | 14 +++++++++++++-
.../db/subscription/event/SubscriptionEvent.java | 6 ++++++
.../batch/SubscriptionPipeTabletEventBatch.java | 10 +++++++++-
.../batch/SubscriptionPipeTsFileEventBatch.java | 12 +++++++++++-
.../event/pipe/SubscriptionPipeEmptyEvent.java | 9 +++++++++
.../event/pipe/SubscriptionPipeEvents.java | 4 ++++
.../pipe/SubscriptionPipeTabletBatchEvents.java | 9 +++++++++
.../pipe/SubscriptionPipeTsFileBatchEvents.java | 13 +++++++++++++
.../event/pipe/SubscriptionPipeTsFilePlainEvent.java | 9 +++++++++
.../metric/SubscriptionPrefetchingQueueMetrics.java | 2 +-
.../task/subtask/SubscriptionConnectorSubtask.java | 14 ++++++++++++++
.../subtask/SubscriptionConnectorSubtaskManager.java | 3 +++
14 files changed, 131 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 7f4f187ebd7..13770fdbf7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -203,4 +203,14 @@ public class SubscriptionBrokerAgent {
}
broker.executePrefetch(topicName);
}
+
+ public int getPipeEventCount(final String consumerGroupId, final String
topicName) {
+ final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+ if (Objects.isNull(broker)) {
+ LOGGER.warn(
+ "Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
+ return 0;
+ }
+ return broker.getPipeEventCount(topicName);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index f6fc757f5ce..420b571103a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -368,4 +368,24 @@ public class SubscriptionBroker {
}
prefetchingQueue.executePrefetch();
}
+
+ public int getPipeEventCount(final String topicName) {
+ final SubscriptionPrefetchingQueue prefetchingQueue =
+ topicNameToPrefetchingQueue.get(topicName);
+ if (Objects.isNull(prefetchingQueue)) {
+ LOGGER.warn(
+ "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] does not exist",
+ topicName,
+ brokerId);
+ return 0;
+ }
+ if (prefetchingQueue.isClosed()) {
+ LOGGER.warn(
+ "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is closed",
+ topicName,
+ brokerId);
+ return 0;
+ }
+ return prefetchingQueue.getPipeEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e23f5fb0527..ac5b3fad279 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -531,7 +531,7 @@ public abstract class SubscriptionPrefetchingQueue {
return consumerGroupId + "_" + topicName;
}
- public long getUncommittedEventCount() {
+ public long getSubscriptionUncommittedEventCount() {
return inFlightEvents.size();
}
@@ -539,6 +539,18 @@ public abstract class SubscriptionPrefetchingQueue {
return commitIdGenerator.get();
}
+ public int getPipeEventCount() {
+ return inputPendingQueue.size()
+ + prefetchingQueue.stream()
+ .map(SubscriptionEvent::getPipeEventCount)
+ .reduce(Integer::sum)
+ .orElse(0)
+ + +inFlightEvents.values().stream()
+ .map(SubscriptionEvent::getPipeEventCount)
+ .reduce(Integer::sum)
+ .orElse(0);
+ }
+
/////////////////////////////// close & termination
///////////////////////////////
public boolean isClosed() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index 2f7902f1c6b..ad274023f6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -415,6 +415,12 @@ public class SubscriptionEvent {
return pipeEvents.getTsFile().getName();
}
+ /////////////////////////////// APIs provided for metric framework
///////////////////////////////
+
+ public int getPipeEventCount() {
+ return pipeEvents.getPipeEventCount();
+ }
+
/////////////////////////////// object ///////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 54363ac8c7c..871b1c12587 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -70,7 +70,7 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@Override
public synchronized boolean onEvent(final Consumer<SubscriptionEvent>
consumer) {
- if (shouldEmit()) {
+ if (shouldEmit() && !enrichedEvents.isEmpty()) {
if (Objects.isNull(events)) {
events = generateSubscriptionEvents();
}
@@ -98,6 +98,8 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
enrichedEvent.clearReferenceCount(this.getClass().getName());
}
+ enrichedEvents.clear();
+ tablets.clear();
}
public synchronized void ack() {
@@ -221,4 +223,10 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
}
return eventMessageList.toString();
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public int getPipeEventCount() {
+ return enrichedEvents.size();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 2b5e29f7f6c..e38887e19f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -43,6 +43,7 @@ import java.util.function.Consumer;
public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch {
private final PipeTabletEventTsFileBatch batch;
+ private final List<EnrichedEvent> enrichedEvents;
public SubscriptionPipeTsFileEventBatch(
final int regionId,
@@ -51,11 +52,12 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
final long maxBatchSizeInBytes) {
super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs,
maxBatchSizeInBytes);
+ this.enrichedEvents = new ArrayList<>();
}
@Override
public synchronized boolean onEvent(final Consumer<SubscriptionEvent>
consumer) throws Exception {
- if (batch.shouldEmit()) {
+ if (batch.shouldEmit() && !enrichedEvents.isEmpty()) {
if (Objects.isNull(events)) {
events = generateSubscriptionEvents();
}
@@ -74,6 +76,7 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
throws Exception {
if (event instanceof TabletInsertionEvent) {
batch.onEvent((TabletInsertionEvent) event); // no exceptions will be
thrown
+ enrichedEvents.add(event);
event.decreaseReferenceCount(
SubscriptionPipeTsFileEventBatch.class.getName(),
false); // missing releaseLastEvent decreases reference count
@@ -85,6 +88,7 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
public synchronized void cleanUp() {
// close batch, it includes clearing the reference count of events
batch.close();
+ enrichedEvents.clear();
}
public synchronized void ack() {
@@ -128,4 +132,10 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
coreReportMessage.put("batch", batch.toString());
return coreReportMessage;
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ public int getPipeEventCount() {
+ return enrichedEvents.size();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index d0556cd15b7..3e74e03d8f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -34,8 +34,17 @@ public class SubscriptionPipeEmptyEvent implements
SubscriptionPipeEvents {
@Override
public void cleanUp() {}
+ /////////////////////////////// stringify ///////////////////////////////
+
@Override
public String toString() {
return "SubscriptionEmptyPipeEvent";
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ @Override
+ public int getPipeEventCount() {
+ return 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
index 813690dd00b..489c4cf8120 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
@@ -31,4 +31,8 @@ public interface SubscriptionPipeEvents {
void ack();
void cleanUp();
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ int getPipeEventCount();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index fb9e31f77c2..226367405eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -46,8 +46,17 @@ public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents
batch.cleanUp();
}
+ /////////////////////////////// stringify ///////////////////////////////
+
@Override
public String toString() {
return "SubscriptionPipeTabletBatchEvents{batch=" + batch + "}";
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ @Override
+ public int getPipeEventCount() {
+ return batch.getPipeEventCount();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index aa9a3593a93..5cae21f5ac8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -29,6 +29,7 @@ public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents
private final SubscriptionPipeTsFileEventBatch batch;
private final File tsFile;
private final AtomicInteger referenceCount; // shared between the same batch
+ private final int count; // snapshot the initial reference count, used for
event count calculation
public SubscriptionPipeTsFileBatchEvents(
final SubscriptionPipeTsFileEventBatch batch,
@@ -37,6 +38,7 @@ public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents
this.batch = batch;
this.tsFile = tsFile;
this.referenceCount = referenceCount;
+ this.count = Math.max(1, referenceCount.get());
}
@Override
@@ -58,6 +60,8 @@ public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents
}
}
+ /////////////////////////////// stringify ///////////////////////////////
+
@Override
public String toString() {
return "SubscriptionPipeTsFileBatchEvents{batch="
@@ -68,4 +72,13 @@ public class SubscriptionPipeTsFileBatchEvents implements
SubscriptionPipeEvents
+ referenceCount
+ "}";
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ @Override
+ public int getPipeEventCount() {
+ // Since multiple events will share the same batch, equal division is
performed here.
+ // If it is not exact, round up to remain pessimistic.
+ return (batch.getPipeEventCount() + count - 1) / count;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index c210635a061..111006fa6d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -47,10 +47,19 @@ public class SubscriptionPipeTsFilePlainEvent implements
SubscriptionPipeEvents
tsFileInsertionEvent.clearReferenceCount(this.getClass().getName());
}
+ /////////////////////////////// stringify ///////////////////////////////
+
@Override
public String toString() {
return "SubscriptionPipeTsFilePlainEvent{tsFileInsertionEvent="
+ tsFileInsertionEvent.coreReportMessage()
+ "}";
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ @Override
+ public int getPipeEventCount() {
+ return 1;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
index d38de0d709e..4e712fb2972 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
@@ -92,7 +92,7 @@ public class SubscriptionPrefetchingQueueMetrics implements
IMetricSet {
Metric.SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT.toString(),
MetricLevel.IMPORTANT,
queue,
- SubscriptionPrefetchingQueue::getUncommittedEventCount,
+ SubscriptionPrefetchingQueue::getSubscriptionUncommittedEventCount,
Tag.NAME.toString(),
queue.getPrefetchingQueueId());
// current commit id
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index 8524e77d040..3e540254d8e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -25,8 +25,13 @@ import
org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class SubscriptionConnectorSubtask extends PipeConnectorSubtask {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConnectorSubtask.class);
+
private final String topicName;
private final String consumerGroupId;
@@ -72,4 +77,13 @@ public class SubscriptionConnectorSubtask extends
PipeConnectorSubtask {
public UnboundedBlockingPendingQueue<Event> getInputPendingQueue() {
return inputPendingQueue;
}
+
+ //////////////////////////// APIs provided for metric framework
////////////////////////////
+
+ @Override
+ public int getEventCount(final String pipeName) {
+ // count the number of pipe events in sink queue and prefetching queue,
note that can safely
+ // ignore lastEvent
+ return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId,
topicName);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index dfce62faac8..e0707ebf547 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecuto
import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeRealtimePriorityBlockingQueue;
+import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -157,6 +158,8 @@ public class SubscriptionConnectorSubtaskManager {
final PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.register();
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .register(lifeCycle.getSubtask(), environment.getPipeName(),
environment.getCreationTime());
return attributeSortedString;
}