This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dce69e7f3d6 Subscription: register metrics to count the number of pipe 
events in sink queue and prefetching queue in subscription task (#13575)
dce69e7f3d6 is described below

commit dce69e7f3d661be787ca867f15d88d67750a7db8
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Sep 25 15:47:16 2024 +0800

    Subscription: register metrics to count the number of pipe events in sink 
queue and prefetching queue in subscription task (#13575)
---
 .../subscription/agent/SubscriptionBrokerAgent.java  | 10 ++++++++++
 .../db/subscription/broker/SubscriptionBroker.java   | 20 ++++++++++++++++++++
 .../broker/SubscriptionPrefetchingQueue.java         | 14 +++++++++++++-
 .../db/subscription/event/SubscriptionEvent.java     |  6 ++++++
 .../batch/SubscriptionPipeTabletEventBatch.java      | 10 +++++++++-
 .../batch/SubscriptionPipeTsFileEventBatch.java      | 12 +++++++++++-
 .../event/pipe/SubscriptionPipeEmptyEvent.java       |  9 +++++++++
 .../event/pipe/SubscriptionPipeEvents.java           |  4 ++++
 .../pipe/SubscriptionPipeTabletBatchEvents.java      |  9 +++++++++
 .../pipe/SubscriptionPipeTsFileBatchEvents.java      | 13 +++++++++++++
 .../event/pipe/SubscriptionPipeTsFilePlainEvent.java |  9 +++++++++
 .../metric/SubscriptionPrefetchingQueueMetrics.java  |  2 +-
 .../task/subtask/SubscriptionConnectorSubtask.java   | 14 ++++++++++++++
 .../subtask/SubscriptionConnectorSubtaskManager.java |  3 +++
 14 files changed, 131 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 7f4f187ebd7..13770fdbf7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -203,4 +203,14 @@ public class SubscriptionBrokerAgent {
     }
     broker.executePrefetch(topicName);
   }
+
+  public int getPipeEventCount(final String consumerGroupId, final String 
topicName) {
+    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+    if (Objects.isNull(broker)) {
+      LOGGER.warn(
+          "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
+      return 0;
+    }
+    return broker.getPipeEventCount(topicName);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index f6fc757f5ce..420b571103a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -368,4 +368,24 @@ public class SubscriptionBroker {
     }
     prefetchingQueue.executePrefetch();
   }
+
+  public int getPipeEventCount(final String topicName) {
+    final SubscriptionPrefetchingQueue prefetchingQueue =
+        topicNameToPrefetchingQueue.get(topicName);
+    if (Objects.isNull(prefetchingQueue)) {
+      LOGGER.warn(
+          "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] does not exist",
+          topicName,
+          brokerId);
+      return 0;
+    }
+    if (prefetchingQueue.isClosed()) {
+      LOGGER.warn(
+          "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is closed",
+          topicName,
+          brokerId);
+      return 0;
+    }
+    return prefetchingQueue.getPipeEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e23f5fb0527..ac5b3fad279 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -531,7 +531,7 @@ public abstract class SubscriptionPrefetchingQueue {
     return consumerGroupId + "_" + topicName;
   }
 
-  public long getUncommittedEventCount() {
+  public long getSubscriptionUncommittedEventCount() {
     return inFlightEvents.size();
   }
 
@@ -539,6 +539,18 @@ public abstract class SubscriptionPrefetchingQueue {
     return commitIdGenerator.get();
   }
 
+  public int getPipeEventCount() {
+    return inputPendingQueue.size()
+        + prefetchingQueue.stream()
+            .map(SubscriptionEvent::getPipeEventCount)
+            .reduce(Integer::sum)
+            .orElse(0)
+        + +inFlightEvents.values().stream()
+            .map(SubscriptionEvent::getPipeEventCount)
+            .reduce(Integer::sum)
+            .orElse(0);
+  }
+
   /////////////////////////////// close & termination 
///////////////////////////////
 
   public boolean isClosed() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index 2f7902f1c6b..ad274023f6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -415,6 +415,12 @@ public class SubscriptionEvent {
     return pipeEvents.getTsFile().getName();
   }
 
+  /////////////////////////////// APIs provided for metric framework 
///////////////////////////////
+
+  public int getPipeEventCount() {
+    return pipeEvents.getPipeEventCount();
+  }
+
   /////////////////////////////// object ///////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 54363ac8c7c..871b1c12587 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -70,7 +70,7 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
 
   @Override
   public synchronized boolean onEvent(final Consumer<SubscriptionEvent> 
consumer) {
-    if (shouldEmit()) {
+    if (shouldEmit() && !enrichedEvents.isEmpty()) {
       if (Objects.isNull(events)) {
         events = generateSubscriptionEvents();
       }
@@ -98,6 +98,8 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
     for (final EnrichedEvent enrichedEvent : enrichedEvents) {
       enrichedEvent.clearReferenceCount(this.getClass().getName());
     }
+    enrichedEvents.clear();
+    tablets.clear();
   }
 
   public synchronized void ack() {
@@ -221,4 +223,10 @@ public class SubscriptionPipeTabletEventBatch extends 
SubscriptionPipeEventBatch
     }
     return eventMessageList.toString();
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public int getPipeEventCount() {
+    return enrichedEvents.size();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 2b5e29f7f6c..e38887e19f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -43,6 +43,7 @@ import java.util.function.Consumer;
 public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch {
 
   private final PipeTabletEventTsFileBatch batch;
+  private final List<EnrichedEvent> enrichedEvents;
 
   public SubscriptionPipeTsFileEventBatch(
       final int regionId,
@@ -51,11 +52,12 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
       final long maxBatchSizeInBytes) {
     super(regionId, prefetchingQueue, maxDelayInMs, maxBatchSizeInBytes);
     this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, 
maxBatchSizeInBytes);
+    this.enrichedEvents = new ArrayList<>();
   }
 
   @Override
   public synchronized boolean onEvent(final Consumer<SubscriptionEvent> 
consumer) throws Exception {
-    if (batch.shouldEmit()) {
+    if (batch.shouldEmit() && !enrichedEvents.isEmpty()) {
       if (Objects.isNull(events)) {
         events = generateSubscriptionEvents();
       }
@@ -74,6 +76,7 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
       throws Exception {
     if (event instanceof TabletInsertionEvent) {
       batch.onEvent((TabletInsertionEvent) event); // no exceptions will be 
thrown
+      enrichedEvents.add(event);
       event.decreaseReferenceCount(
           SubscriptionPipeTsFileEventBatch.class.getName(),
           false); // missing releaseLastEvent decreases reference count
@@ -85,6 +88,7 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
   public synchronized void cleanUp() {
     // close batch, it includes clearing the reference count of events
     batch.close();
+    enrichedEvents.clear();
   }
 
   public synchronized void ack() {
@@ -128,4 +132,10 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
     coreReportMessage.put("batch", batch.toString());
     return coreReportMessage;
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public int getPipeEventCount() {
+    return enrichedEvents.size();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
index d0556cd15b7..3e74e03d8f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEmptyEvent.java
@@ -34,8 +34,17 @@ public class SubscriptionPipeEmptyEvent implements 
SubscriptionPipeEvents {
   @Override
   public void cleanUp() {}
 
+  /////////////////////////////// stringify ///////////////////////////////
+
   @Override
   public String toString() {
     return "SubscriptionEmptyPipeEvent";
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  @Override
+  public int getPipeEventCount() {
+    return 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
index 813690dd00b..489c4cf8120 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeEvents.java
@@ -31,4 +31,8 @@ public interface SubscriptionPipeEvents {
   void ack();
 
   void cleanUp();
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  int getPipeEventCount();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index fb9e31f77c2..226367405eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -46,8 +46,17 @@ public class SubscriptionPipeTabletBatchEvents implements 
SubscriptionPipeEvents
     batch.cleanUp();
   }
 
+  /////////////////////////////// stringify ///////////////////////////////
+
   @Override
   public String toString() {
     return "SubscriptionPipeTabletBatchEvents{batch=" + batch + "}";
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  @Override
+  public int getPipeEventCount() {
+    return batch.getPipeEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
index aa9a3593a93..5cae21f5ac8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFileBatchEvents.java
@@ -29,6 +29,7 @@ public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents
   private final SubscriptionPipeTsFileEventBatch batch;
   private final File tsFile;
   private final AtomicInteger referenceCount; // shared between the same batch
+  private final int count; // snapshot the initial reference count, used for 
event count calculation
 
   public SubscriptionPipeTsFileBatchEvents(
       final SubscriptionPipeTsFileEventBatch batch,
@@ -37,6 +38,7 @@ public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents
     this.batch = batch;
     this.tsFile = tsFile;
     this.referenceCount = referenceCount;
+    this.count = Math.max(1, referenceCount.get());
   }
 
   @Override
@@ -58,6 +60,8 @@ public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents
     }
   }
 
+  /////////////////////////////// stringify ///////////////////////////////
+
   @Override
   public String toString() {
     return "SubscriptionPipeTsFileBatchEvents{batch="
@@ -68,4 +72,13 @@ public class SubscriptionPipeTsFileBatchEvents implements 
SubscriptionPipeEvents
         + referenceCount
         + "}";
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  @Override
+  public int getPipeEventCount() {
+    // Since multiple events will share the same batch, equal division is 
performed here.
+    // If it is not exact, round up to remain pessimistic.
+    return (batch.getPipeEventCount() + count - 1) / count;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
index c210635a061..111006fa6d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTsFilePlainEvent.java
@@ -47,10 +47,19 @@ public class SubscriptionPipeTsFilePlainEvent implements 
SubscriptionPipeEvents
     tsFileInsertionEvent.clearReferenceCount(this.getClass().getName());
   }
 
+  /////////////////////////////// stringify ///////////////////////////////
+
   @Override
   public String toString() {
     return "SubscriptionPipeTsFilePlainEvent{tsFileInsertionEvent="
         + tsFileInsertionEvent.coreReportMessage()
         + "}";
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  @Override
+  public int getPipeEventCount() {
+    return 1;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
index d38de0d709e..4e712fb2972 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/metric/SubscriptionPrefetchingQueueMetrics.java
@@ -92,7 +92,7 @@ public class SubscriptionPrefetchingQueueMetrics implements 
IMetricSet {
         Metric.SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT.toString(),
         MetricLevel.IMPORTANT,
         queue,
-        SubscriptionPrefetchingQueue::getUncommittedEventCount,
+        SubscriptionPrefetchingQueue::getSubscriptionUncommittedEventCount,
         Tag.NAME.toString(),
         queue.getPrefetchingQueueId());
     // current commit id
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index 8524e77d040..3e540254d8e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -25,8 +25,13 @@ import 
org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class SubscriptionConnectorSubtask extends PipeConnectorSubtask {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConnectorSubtask.class);
+
   private final String topicName;
   private final String consumerGroupId;
 
@@ -72,4 +77,13 @@ public class SubscriptionConnectorSubtask extends 
PipeConnectorSubtask {
   public UnboundedBlockingPendingQueue<Event> getInputPendingQueue() {
     return inputPendingQueue;
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  @Override
+  public int getEventCount(final String pipeName) {
+    // count the number of pipe events in sink queue and prefetching queue, 
note that can safely
+    // ignore lastEvent
+    return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, 
topicName);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index dfce62faac8..e0707ebf547 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecuto
 import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
 import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
 import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeRealtimePriorityBlockingQueue;
+import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -157,6 +158,8 @@ public class SubscriptionConnectorSubtaskManager {
     final PipeConnectorSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
     lifeCycle.register();
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        .register(lifeCycle.getSubtask(), environment.getPipeName(), 
environment.getCreationTime());
 
     return attributeSortedString;
   }

Reply via email to