This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 50cc5da0738 Subscription: intro SubscriptionLogManager for global 
uniform downsampling of prefetched logs (#15175) (#15187)
50cc5da0738 is described below

commit 50cc5da0738845d10b8b80f6f3a07da1d5acda6f
Author: VGalaxies <[email protected]>
AuthorDate: Tue Mar 25 18:47:21 2025 +0800

    Subscription: intro SubscriptionLogManager for global uniform downsampling 
of prefetched logs (#15175) (#15187)
---
 .../agent/SubscriptionBrokerAgent.java             | 10 ++-
 .../db/subscription/broker/SubscriptionBroker.java | 25 ++++---
 .../broker/SubscriptionPrefetchingQueue.java       | 11 ++-
 .../SubscriptionDataNodeResourceManager.java       | 43 ++++++++++++
 .../resource/log/SubscriptionLogManager.java       | 39 ++++++++++
 .../resource/log/SubscriptionLogStatus.java        | 82 ++++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 21 +++++-
 .../iotdb/commons/conf/CommonDescriptor.java       | 10 +++
 .../subscription/config/SubscriptionConfig.java    | 12 ++++
 9 files changed, 235 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 6d70d5796ff..f1ee9f8867b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.agent;
 
 import org.apache.iotdb.db.subscription.broker.SubscriptionBroker;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import 
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtask;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -197,8 +198,13 @@ public class SubscriptionBrokerAgent {
   public boolean executePrefetch(final String consumerGroupId, final String 
topicName) {
     final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
     if (Objects.isNull(broker)) {
-      LOGGER.warn(
-          "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
+      SubscriptionDataNodeResourceManager.log()
+          .schedule(SubscriptionBrokerAgent.class, consumerGroupId, topicName)
+          .ifPresent(
+              l ->
+                  l.warn(
+                      "Subscription: broker bound to consumer group [{}] does 
not exist",
+                      consumerGroupId));
       return false;
     }
     return broker.executePrefetch(topicName);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index df888ec1b03..1223672810d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
+import 
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -419,17 +420,25 @@ public class SubscriptionBroker {
     final SubscriptionPrefetchingQueue prefetchingQueue =
         topicNameToPrefetchingQueue.get(topicName);
     if (Objects.isNull(prefetchingQueue)) {
-      LOGGER.warn(
-          "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] does not exist",
-          topicName,
-          brokerId);
+      SubscriptionDataNodeResourceManager.log()
+          .schedule(SubscriptionBroker.class, brokerId, topicName)
+          .ifPresent(
+              l ->
+                  l.warn(
+                      "Subscription: prefetching queue bound to topic [{}] for 
consumer group [{}] does not exist",
+                      topicName,
+                      brokerId));
       return false;
     }
     if (prefetchingQueue.isClosed()) {
-      LOGGER.warn(
-          "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is closed",
-          topicName,
-          brokerId);
+      SubscriptionDataNodeResourceManager.log()
+          .schedule(SubscriptionBroker.class, brokerId, topicName)
+          .ifPresent(
+              l ->
+                  l.warn(
+                      "Subscription: prefetching queue bound to topic [{}] for 
consumer group [{}] is closed",
+                      topicName,
+                      brokerId));
       return false;
     }
     return prefetchingQueue.executePrefetch();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 0514ac0075e..abd3e1c300c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
+import 
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -97,9 +98,6 @@ public abstract class SubscriptionPrefetchingQueue {
 
   private final SubscriptionPrefetchingQueueStates states;
 
-  private static final long STATE_REPORT_INTERVAL_IN_MS = 10_000L;
-  private long lastStateReportTimestamp = System.currentTimeMillis();
-
   private volatile boolean isCompleted = false;
   private volatile boolean isClosed = false;
 
@@ -280,10 +278,9 @@ public abstract class SubscriptionPrefetchingQueue {
   }
 
   private void reportStateIfNeeded() {
-    if (System.currentTimeMillis() - lastStateReportTimestamp > 
STATE_REPORT_INTERVAL_IN_MS) {
-      LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
-      lastStateReportTimestamp = System.currentTimeMillis();
-    }
+    SubscriptionDataNodeResourceManager.log()
+        .schedule(SubscriptionPrefetchingQueue.class, brokerId, topicName)
+        .ifPresent(l -> l.info("Subscription: SubscriptionPrefetchingQueue 
state {}", this));
   }
 
   @SafeVarargs
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
new file mode 100644
index 00000000000..347299df10f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.resource;
+
+import org.apache.iotdb.db.subscription.resource.log.SubscriptionLogManager;
+
+public class SubscriptionDataNodeResourceManager {
+
+  private final SubscriptionLogManager subscriptionLogManager;
+
+  public static SubscriptionLogManager log() {
+    return 
SubscriptionDataNodeResourceManagerHolder.INSTANCE.subscriptionLogManager;
+  }
+
+  ///////////////////////////// SINGLETON /////////////////////////////
+
+  private SubscriptionDataNodeResourceManager() {
+    subscriptionLogManager = new SubscriptionLogManager();
+  }
+
+  private static class SubscriptionDataNodeResourceManagerHolder {
+
+    private static final SubscriptionDataNodeResourceManager INSTANCE =
+        new SubscriptionDataNodeResourceManager();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
new file mode 100644
index 00000000000..549f9344d7f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.resource.log;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class SubscriptionLogManager {
+
+  private final ConcurrentMap<Class<?>, SubscriptionLogStatus> 
logClass2LogStatusMap =
+      new ConcurrentHashMap<>();
+
+  public Optional<Logger> schedule(
+      final Class<?> logClass, final String consumerGroupId, final String 
topicName) {
+    return logClass2LogStatusMap
+        .computeIfAbsent(logClass, k -> new SubscriptionLogStatus(logClass))
+        .schedule(consumerGroupId, topicName);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
new file mode 100644
index 00000000000..0daae1dc937
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.resource.log;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+class SubscriptionLogStatus {
+
+  private static final long BASE_INTERVAL_IN_MS =
+      
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs();
+
+  private final Logger logger;
+  private final Cache<Pair<String, String>, AtomicLong> lastReportTimestamps;
+
+  public SubscriptionLogStatus(final Class<?> logClass) {
+    this.logger = LoggerFactory.getLogger(logClass);
+    this.lastReportTimestamps =
+        Caffeine.newBuilder()
+            .expireAfterAccess(
+                
SubscriptionConfig.getInstance().getSubscriptionLogManagerWindowSeconds(),
+                TimeUnit.SECONDS)
+            .build();
+  }
+
+  public Optional<Logger> schedule(final String consumerGroupId, final String 
topicName) {
+    final Pair<String, String> key = new Pair<>(consumerGroupId, topicName);
+    final long now = System.currentTimeMillis();
+    // Calculate the allowed logging interval based on the current prefetching 
queue count
+    final int count = SubscriptionAgent.broker().getPrefetchingQueueCount();
+    final long allowedInterval = BASE_INTERVAL_IN_MS * count;
+    // If the key does not exist, initialize an AtomicLong set to one interval 
before now
+    final AtomicLong lastTime =
+        Objects.requireNonNull(
+            lastReportTimestamps.get(
+                key,
+                k ->
+                    new AtomicLong(
+                        now
+                            // introduce randomness
+                            - BASE_INTERVAL_IN_MS
+                                * ThreadLocalRandom.current().nextLong(1, 
count + 1))));
+    final long last = lastTime.get();
+    if (now - last >= allowedInterval) {
+      // Use compareAndSet to ensure that only one thread updates at a time,
+      // so that only one log entry is printed per allowed interval
+      if (lastTime.compareAndSet(last, now)) {
+        return Optional.of(logger);
+      }
+    }
+    return Optional.empty();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4b814a26386..99e041cdfb4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -308,7 +308,7 @@ public class CommonConfig {
   private int subscriptionPollMaxBlockingTimeMs = 500;
   private int subscriptionDefaultTimeoutInMs = 10_000; // 10s
   private long subscriptionLaunchRetryIntervalMs = 1000;
-  private int subscriptionRecycleUncommittedEventIntervalMs = 600000; // 600s
+  private int subscriptionRecycleUncommittedEventIntervalMs = 600_000; // 600s
   private long subscriptionReadFileBufferSize = 8 * MB;
   private long subscriptionReadTabletBufferSize = 8 * MB;
   private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
@@ -316,6 +316,8 @@ public class CommonConfig {
   private long subscriptionEstimatedInsertNodeTabletInsertionEventSize = 64 * 
KB;
   private long subscriptionEstimatedRawTabletInsertionEventSize = 16 * KB;
   private long subscriptionMaxAllowedEventCountInTabletBatch = 100;
+  private long subscriptionLogManagerWindowSeconds = 120; // 120s
+  private long subscriptionLogManagerBaseIntervalMs = 1_000; // 1s
 
   private boolean subscriptionPrefetchEnabled = false;
   private float subscriptionPrefetchMemoryThreshold = 0.5F;
@@ -1508,6 +1510,23 @@ public class CommonConfig {
         subscriptionMaxAllowedEventCountInTabletBatch;
   }
 
+  public long getSubscriptionLogManagerWindowSeconds() {
+    return subscriptionLogManagerWindowSeconds;
+  }
+
+  public void setSubscriptionLogManagerWindowSeconds(long 
subscriptionLogManagerWindowSeconds) {
+    this.subscriptionLogManagerWindowSeconds = 
subscriptionLogManagerWindowSeconds;
+  }
+
+  public long getSubscriptionLogManagerBaseIntervalMs() {
+    return subscriptionLogManagerBaseIntervalMs;
+  }
+
+  public void setSubscriptionLogManagerBaseIntervalMs(
+      final long subscriptionLogManagerBaseIntervalMs) {
+    this.subscriptionLogManagerBaseIntervalMs = 
subscriptionLogManagerBaseIntervalMs;
+  }
+
   public boolean getSubscriptionPrefetchEnabled() {
     return subscriptionPrefetchEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index b3613bb6677..c6b7013b23e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -776,6 +776,16 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_max_allowed_event_count_in_tablet_batch",
                 
String.valueOf(config.getSubscriptionMaxAllowedEventCountInTabletBatch()))));
+    config.setSubscriptionLogManagerWindowSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "subscription_log_manager_window_seconds",
+                
String.valueOf(config.getSubscriptionLogManagerWindowSeconds()))));
+    config.setSubscriptionLogManagerBaseIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "subscription_log_manager_base_interval_ms",
+                
String.valueOf(config.getSubscriptionLogManagerBaseIntervalMs()))));
 
     config.setSubscriptionPrefetchEnabled(
         Boolean.parseBoolean(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 1ace6e71de8..a332b87f0ed 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -97,6 +97,14 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionMaxAllowedEventCountInTabletBatch();
   }
 
+  public long getSubscriptionLogManagerWindowSeconds() {
+    return COMMON_CONFIG.getSubscriptionLogManagerWindowSeconds();
+  }
+
+  public long getSubscriptionLogManagerBaseIntervalMs() {
+    return COMMON_CONFIG.getSubscriptionLogManagerBaseIntervalMs();
+  }
+
   public boolean getSubscriptionPrefetchEnabled() {
     return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
   }
@@ -171,6 +179,10 @@ public class SubscriptionConfig {
     LOGGER.info(
         "SubscriptionMaxAllowedEventCountInTabletBatch: {}",
         getSubscriptionMaxAllowedEventCountInTabletBatch());
+    LOGGER.info(
+        "SubscriptionLogManagerWindowSeconds: {}", 
getSubscriptionLogManagerWindowSeconds());
+    LOGGER.info(
+        "SubscriptionLogManagerBaseIntervalMs: {}", 
getSubscriptionLogManagerBaseIntervalMs());
 
     LOGGER.info("SubscriptionPrefetchEnabled: {}", 
getSubscriptionPrefetchEnabled());
     LOGGER.info(

Reply via email to