This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c58680b6a6d Subscription: optimize prefetching queue report state
overhead (#15664)
c58680b6a6d is described below
commit c58680b6a6d53fd9cf155ff0d846f6c45bead49c
Author: VGalaxies <[email protected]>
AuthorDate: Mon Jun 9 15:12:27 2025 +0800
Subscription: optimize prefetching queue report state overhead (#15664)
* setup
* add comments
---
.../agent/SubscriptionBrokerAgent.java | 55 ++++++++++++++++++++++
.../broker/SubscriptionPrefetchingQueue.java | 12 +++--
2 files changed, 63 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 2ea4f0a731e..a799470f6d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -37,6 +37,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
public class SubscriptionBrokerAgent {
@@ -45,6 +46,9 @@ public class SubscriptionBrokerAgent {
private final Map<String, SubscriptionBroker>
consumerGroupIdToSubscriptionBroker =
new ConcurrentHashMap<>();
+ private final Cache<Integer> prefetchingQueueCount =
+ new Cache<>(this::getPrefetchingQueueCountInternal);
+
//////////////////////////// provided for subscription agent
////////////////////////////
public List<SubscriptionEvent> poll(
@@ -193,6 +197,7 @@ public class SubscriptionBrokerAgent {
return broker;
})
.bindPrefetchingQueue(subtask.getTopicName(),
subtask.getInputPendingQueue());
+ prefetchingQueueCount.invalidate();
}
public void updateCompletedTopicNames(final String consumerGroupId, final
String topicName) {
@@ -213,6 +218,7 @@ public class SubscriptionBrokerAgent {
return;
}
broker.unbindPrefetchingQueue(topicName);
+ prefetchingQueueCount.invalidate();
}
public void removePrefetchingQueue(final String consumerGroupId, final
String topicName) {
@@ -223,6 +229,7 @@ public class SubscriptionBrokerAgent {
return;
}
broker.removePrefetchingQueue(topicName);
+ prefetchingQueueCount.invalidate();
}
public boolean executePrefetch(final String consumerGroupId, final String
topicName) {
@@ -251,8 +258,56 @@ public class SubscriptionBrokerAgent {
}
public int getPrefetchingQueueCount() {
+ return prefetchingQueueCount.get();
+ }
+
+ private int getPrefetchingQueueCountInternal() {
return consumerGroupIdToSubscriptionBroker.values().stream()
.map(SubscriptionBroker::getPrefetchingQueueCount)
.reduce(0, Integer::sum);
}
+
+ /////////////////////////////// Cache ///////////////////////////////
+
+ /**
+ * A simple generic cache that computes and stores a value on demand.
+ *
+ * <p>Note that since the get() and invalidate() methods are not modified
with synchronized, the
+ * value obtained may not be entirely accurate.
+ *
+ * @param <T> the type of the cached value
+ */
+ private static class Cache<T> {
+
+ private T value;
+ private volatile boolean valid = false;
+ private final Supplier<T> supplier;
+
+ /**
+ * Construct a cache with a supplier that knows how to compute the value.
+ *
+ * @param supplier a Supplier that computes the value when needed
+ */
+ private Cache(final Supplier<T> supplier) {
+ this.supplier = supplier;
+ }
+
+ /** Invalidate the cache. The next call to get() will recompute the value.
*/
+ private void invalidate() {
+ valid = false;
+ }
+
+ /**
+ * Return the cached value, recomputing it if the cache is invalid.
+ *
+ * @return the current value, recomputed if necessary
+ */
+ private T get() {
+ if (!valid) {
+ value = supplier.get();
+ valid = true;
+ }
+ return value;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index f0aed45e727..fa443eb50f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
-import
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -101,6 +100,8 @@ public abstract class SubscriptionPrefetchingQueue {
private final SubscriptionPrefetchingQueueStates states;
+ private long lastStateReportTimestamp = System.currentTimeMillis();
+
private volatile boolean isCompleted = false;
private volatile boolean isClosed = false;
@@ -283,9 +284,12 @@ public abstract class SubscriptionPrefetchingQueue {
}
private void reportStateIfNeeded() {
- SubscriptionDataNodeResourceManager.log()
- .schedule(SubscriptionPrefetchingQueue.class, brokerId, topicName)
- .ifPresent(l -> l.info("Subscription: SubscriptionPrefetchingQueue
state {}", this));
+ if (System.currentTimeMillis() - lastStateReportTimestamp
+ >
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs()
+ * SubscriptionAgent.broker().getPrefetchingQueueCount()) {
+ LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
+ lastStateReportTimestamp = System.currentTimeMillis();
+ }
}
@SafeVarargs