This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c58680b6a6d Subscription: optimize prefetching queue report state 
overhead (#15664)
c58680b6a6d is described below

commit c58680b6a6d53fd9cf155ff0d846f6c45bead49c
Author: VGalaxies <[email protected]>
AuthorDate: Mon Jun 9 15:12:27 2025 +0800

    Subscription: optimize prefetching queue report state overhead (#15664)
    
    * setup
    
    * add comments
---
 .../agent/SubscriptionBrokerAgent.java             | 55 ++++++++++++++++++++++
 .../broker/SubscriptionPrefetchingQueue.java       | 12 +++--
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 2ea4f0a731e..a799470f6d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -37,6 +37,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 public class SubscriptionBrokerAgent {
 
@@ -45,6 +46,9 @@ public class SubscriptionBrokerAgent {
   private final Map<String, SubscriptionBroker> 
consumerGroupIdToSubscriptionBroker =
       new ConcurrentHashMap<>();
 
+  private final Cache<Integer> prefetchingQueueCount =
+      new Cache<>(this::getPrefetchingQueueCountInternal);
+
   //////////////////////////// provided for subscription agent 
////////////////////////////
 
   public List<SubscriptionEvent> poll(
@@ -193,6 +197,7 @@ public class SubscriptionBrokerAgent {
               return broker;
             })
         .bindPrefetchingQueue(subtask.getTopicName(), 
subtask.getInputPendingQueue());
+    prefetchingQueueCount.invalidate();
   }
 
   public void updateCompletedTopicNames(final String consumerGroupId, final 
String topicName) {
@@ -213,6 +218,7 @@ public class SubscriptionBrokerAgent {
       return;
     }
     broker.unbindPrefetchingQueue(topicName);
+    prefetchingQueueCount.invalidate();
   }
 
   public void removePrefetchingQueue(final String consumerGroupId, final 
String topicName) {
@@ -223,6 +229,7 @@ public class SubscriptionBrokerAgent {
       return;
     }
     broker.removePrefetchingQueue(topicName);
+    prefetchingQueueCount.invalidate();
   }
 
   public boolean executePrefetch(final String consumerGroupId, final String 
topicName) {
@@ -251,8 +258,56 @@ public class SubscriptionBrokerAgent {
   }
 
   public int getPrefetchingQueueCount() {
+    return prefetchingQueueCount.get();
+  }
+
+  private int getPrefetchingQueueCountInternal() {
     return consumerGroupIdToSubscriptionBroker.values().stream()
         .map(SubscriptionBroker::getPrefetchingQueueCount)
         .reduce(0, Integer::sum);
   }
+
+  /////////////////////////////// Cache ///////////////////////////////
+
+  /**
+   * A simple generic cache that computes and stores a value on demand.
+   *
+   * <p>Note that since the get() and invalidate() methods are not modified 
with synchronized, the
+   * value obtained may not be entirely accurate.
+   *
+   * @param <T> the type of the cached value
+   */
+  private static class Cache<T> {
+
+    private T value;
+    private volatile boolean valid = false;
+    private final Supplier<T> supplier;
+
+    /**
+     * Construct a cache with a supplier that knows how to compute the value.
+     *
+     * @param supplier a Supplier that computes the value when needed
+     */
+    private Cache(final Supplier<T> supplier) {
+      this.supplier = supplier;
+    }
+
+    /** Invalidate the cache. The next call to get() will recompute the value. 
*/
+    private void invalidate() {
+      valid = false;
+    }
+
+    /**
+     * Return the cached value, recomputing it if the cache is invalid.
+     *
+     * @return the current value, recomputed if necessary
+     */
+    private T get() {
+      if (!valid) {
+        value = supplier.get();
+        valid = true;
+      }
+      return value;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index f0aed45e727..fa443eb50f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
-import 
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -101,6 +100,8 @@ public abstract class SubscriptionPrefetchingQueue {
 
   private final SubscriptionPrefetchingQueueStates states;
 
+  private long lastStateReportTimestamp = System.currentTimeMillis();
+
   private volatile boolean isCompleted = false;
   private volatile boolean isClosed = false;
 
@@ -283,9 +284,12 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   private void reportStateIfNeeded() {
-    SubscriptionDataNodeResourceManager.log()
-        .schedule(SubscriptionPrefetchingQueue.class, brokerId, topicName)
-        .ifPresent(l -> l.info("Subscription: SubscriptionPrefetchingQueue 
state {}", this));
+    if (System.currentTimeMillis() - lastStateReportTimestamp
+        > 
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs()
+            * SubscriptionAgent.broker().getPrefetchingQueueCount()) {
+      LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
+      lastStateReportTimestamp = System.currentTimeMillis();
+    }
   }
 
   @SafeVarargs

Reply via email to