This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 50cc5da0738 Subscription: intro SubscriptionLogManager for global
uniform downsampling of prefetched logs (#15175) (#15187)
50cc5da0738 is described below
commit 50cc5da0738845d10b8b80f6f3a07da1d5acda6f
Author: VGalaxies <[email protected]>
AuthorDate: Tue Mar 25 18:47:21 2025 +0800
Subscription: intro SubscriptionLogManager for global uniform downsampling
of prefetched logs (#15175) (#15187)
---
.../agent/SubscriptionBrokerAgent.java | 10 ++-
.../db/subscription/broker/SubscriptionBroker.java | 25 ++++---
.../broker/SubscriptionPrefetchingQueue.java | 11 ++-
.../SubscriptionDataNodeResourceManager.java | 43 ++++++++++++
.../resource/log/SubscriptionLogManager.java | 39 ++++++++++
.../resource/log/SubscriptionLogStatus.java | 82 ++++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 21 +++++-
.../iotdb/commons/conf/CommonDescriptor.java | 10 +++
.../subscription/config/SubscriptionConfig.java | 12 ++++
9 files changed, 235 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 6d70d5796ff..f1ee9f8867b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.agent;
import org.apache.iotdb.db.subscription.broker.SubscriptionBroker;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import
org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtask;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -197,8 +198,13 @@ public class SubscriptionBrokerAgent {
public boolean executePrefetch(final String consumerGroupId, final String
topicName) {
final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
- LOGGER.warn(
- "Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
+ SubscriptionDataNodeResourceManager.log()
+ .schedule(SubscriptionBrokerAgent.class, consumerGroupId, topicName)
+ .ifPresent(
+ l ->
+ l.warn(
+ "Subscription: broker bound to consumer group [{}] does
not exist",
+ consumerGroupId));
return false;
}
return broker.executePrefetch(topicName);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index df888ec1b03..1223672810d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
+import
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -419,17 +420,25 @@ public class SubscriptionBroker {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
- LOGGER.warn(
- "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] does not exist",
- topicName,
- brokerId);
+ SubscriptionDataNodeResourceManager.log()
+ .schedule(SubscriptionBroker.class, brokerId, topicName)
+ .ifPresent(
+ l ->
+ l.warn(
+ "Subscription: prefetching queue bound to topic [{}] for
consumer group [{}] does not exist",
+ topicName,
+ brokerId));
return false;
}
if (prefetchingQueue.isClosed()) {
- LOGGER.warn(
- "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is closed",
- topicName,
- brokerId);
+ SubscriptionDataNodeResourceManager.log()
+ .schedule(SubscriptionBroker.class, brokerId, topicName)
+ .ifPresent(
+ l ->
+ l.warn(
+ "Subscription: prefetching queue bound to topic [{}] for
consumer group [{}] is closed",
+ topicName,
+ brokerId));
return false;
}
return prefetchingQueue.executePrefetch();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 0514ac0075e..abd3e1c300c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
+import
org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
import
org.apache.iotdb.db.subscription.task.subtask.SubscriptionReceiverSubtask;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -97,9 +98,6 @@ public abstract class SubscriptionPrefetchingQueue {
private final SubscriptionPrefetchingQueueStates states;
- private static final long STATE_REPORT_INTERVAL_IN_MS = 10_000L;
- private long lastStateReportTimestamp = System.currentTimeMillis();
-
private volatile boolean isCompleted = false;
private volatile boolean isClosed = false;
@@ -280,10 +278,9 @@ public abstract class SubscriptionPrefetchingQueue {
}
private void reportStateIfNeeded() {
- if (System.currentTimeMillis() - lastStateReportTimestamp >
STATE_REPORT_INTERVAL_IN_MS) {
- LOGGER.info("Subscription: SubscriptionPrefetchingQueue state {}", this);
- lastStateReportTimestamp = System.currentTimeMillis();
- }
+ SubscriptionDataNodeResourceManager.log()
+ .schedule(SubscriptionPrefetchingQueue.class, brokerId, topicName)
+ .ifPresent(l -> l.info("Subscription: SubscriptionPrefetchingQueue
state {}", this));
}
@SafeVarargs
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
new file mode 100644
index 00000000000..347299df10f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/SubscriptionDataNodeResourceManager.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.resource;
+
+import org.apache.iotdb.db.subscription.resource.log.SubscriptionLogManager;
+
+public class SubscriptionDataNodeResourceManager {
+
+ private final SubscriptionLogManager subscriptionLogManager;
+
+ public static SubscriptionLogManager log() {
+ return
SubscriptionDataNodeResourceManagerHolder.INSTANCE.subscriptionLogManager;
+ }
+
+ ///////////////////////////// SINGLETON /////////////////////////////
+
+ private SubscriptionDataNodeResourceManager() {
+ subscriptionLogManager = new SubscriptionLogManager();
+ }
+
+ private static class SubscriptionDataNodeResourceManagerHolder {
+
+ private static final SubscriptionDataNodeResourceManager INSTANCE =
+ new SubscriptionDataNodeResourceManager();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
new file mode 100644
index 00000000000..549f9344d7f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.resource.log;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class SubscriptionLogManager {
+
+ private final ConcurrentMap<Class<?>, SubscriptionLogStatus>
logClass2LogStatusMap =
+ new ConcurrentHashMap<>();
+
+ public Optional<Logger> schedule(
+ final Class<?> logClass, final String consumerGroupId, final String
topicName) {
+ return logClass2LogStatusMap
+ .computeIfAbsent(logClass, k -> new SubscriptionLogStatus(logClass))
+ .schedule(consumerGroupId, topicName);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
new file mode 100644
index 00000000000..0daae1dc937
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/resource/log/SubscriptionLogStatus.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.resource.log;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+class SubscriptionLogStatus {
+
+ private static final long BASE_INTERVAL_IN_MS =
+
SubscriptionConfig.getInstance().getSubscriptionLogManagerBaseIntervalMs();
+
+ private final Logger logger;
+ private final Cache<Pair<String, String>, AtomicLong> lastReportTimestamps;
+
+ public SubscriptionLogStatus(final Class<?> logClass) {
+ this.logger = LoggerFactory.getLogger(logClass);
+ this.lastReportTimestamps =
+ Caffeine.newBuilder()
+ .expireAfterAccess(
+
SubscriptionConfig.getInstance().getSubscriptionLogManagerWindowSeconds(),
+ TimeUnit.SECONDS)
+ .build();
+ }
+
+ public Optional<Logger> schedule(final String consumerGroupId, final String
topicName) {
+ final Pair<String, String> key = new Pair<>(consumerGroupId, topicName);
+ final long now = System.currentTimeMillis();
+ // Calculate the allowed logging interval based on the current prefetching
queue count
+ final int count = SubscriptionAgent.broker().getPrefetchingQueueCount();
+ final long allowedInterval = BASE_INTERVAL_IN_MS * count;
+ // If the key does not exist, initialize an AtomicLong set to one interval
before now
+ final AtomicLong lastTime =
+ Objects.requireNonNull(
+ lastReportTimestamps.get(
+ key,
+ k ->
+ new AtomicLong(
+ now
+ // introduce randomness
+ - BASE_INTERVAL_IN_MS
+ * ThreadLocalRandom.current().nextLong(1,
count + 1))));
+ final long last = lastTime.get();
+ if (now - last >= allowedInterval) {
+ // Use compareAndSet to ensure that only one thread updates at a time,
+ // so that only one log entry is printed per allowed interval
+ if (lastTime.compareAndSet(last, now)) {
+ return Optional.of(logger);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4b814a26386..99e041cdfb4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -308,7 +308,7 @@ public class CommonConfig {
private int subscriptionPollMaxBlockingTimeMs = 500;
private int subscriptionDefaultTimeoutInMs = 10_000; // 10s
private long subscriptionLaunchRetryIntervalMs = 1000;
- private int subscriptionRecycleUncommittedEventIntervalMs = 600000; // 600s
+ private int subscriptionRecycleUncommittedEventIntervalMs = 600_000; // 600s
private long subscriptionReadFileBufferSize = 8 * MB;
private long subscriptionReadTabletBufferSize = 8 * MB;
private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
@@ -316,6 +316,8 @@ public class CommonConfig {
private long subscriptionEstimatedInsertNodeTabletInsertionEventSize = 64 *
KB;
private long subscriptionEstimatedRawTabletInsertionEventSize = 16 * KB;
private long subscriptionMaxAllowedEventCountInTabletBatch = 100;
+ private long subscriptionLogManagerWindowSeconds = 120; // 120s
+ private long subscriptionLogManagerBaseIntervalMs = 1_000; // 1s
private boolean subscriptionPrefetchEnabled = false;
private float subscriptionPrefetchMemoryThreshold = 0.5F;
@@ -1508,6 +1510,23 @@ public class CommonConfig {
subscriptionMaxAllowedEventCountInTabletBatch;
}
+ public long getSubscriptionLogManagerWindowSeconds() {
+ return subscriptionLogManagerWindowSeconds;
+ }
+
+ public void setSubscriptionLogManagerWindowSeconds(long
subscriptionLogManagerWindowSeconds) {
+ this.subscriptionLogManagerWindowSeconds =
subscriptionLogManagerWindowSeconds;
+ }
+
+ public long getSubscriptionLogManagerBaseIntervalMs() {
+ return subscriptionLogManagerBaseIntervalMs;
+ }
+
+ public void setSubscriptionLogManagerBaseIntervalMs(
+ final long subscriptionLogManagerBaseIntervalMs) {
+ this.subscriptionLogManagerBaseIntervalMs =
subscriptionLogManagerBaseIntervalMs;
+ }
+
public boolean getSubscriptionPrefetchEnabled() {
return subscriptionPrefetchEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index b3613bb6677..c6b7013b23e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -776,6 +776,16 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_max_allowed_event_count_in_tablet_batch",
String.valueOf(config.getSubscriptionMaxAllowedEventCountInTabletBatch()))));
+ config.setSubscriptionLogManagerWindowSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "subscription_log_manager_window_seconds",
+
String.valueOf(config.getSubscriptionLogManagerWindowSeconds()))));
+ config.setSubscriptionLogManagerBaseIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "subscription_log_manager_base_interval_ms",
+
String.valueOf(config.getSubscriptionLogManagerBaseIntervalMs()))));
config.setSubscriptionPrefetchEnabled(
Boolean.parseBoolean(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 1ace6e71de8..a332b87f0ed 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -97,6 +97,14 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionMaxAllowedEventCountInTabletBatch();
}
+ public long getSubscriptionLogManagerWindowSeconds() {
+ return COMMON_CONFIG.getSubscriptionLogManagerWindowSeconds();
+ }
+
+ public long getSubscriptionLogManagerBaseIntervalMs() {
+ return COMMON_CONFIG.getSubscriptionLogManagerBaseIntervalMs();
+ }
+
public boolean getSubscriptionPrefetchEnabled() {
return COMMON_CONFIG.getSubscriptionPrefetchEnabled();
}
@@ -171,6 +179,10 @@ public class SubscriptionConfig {
LOGGER.info(
"SubscriptionMaxAllowedEventCountInTabletBatch: {}",
getSubscriptionMaxAllowedEventCountInTabletBatch());
+ LOGGER.info(
+ "SubscriptionLogManagerWindowSeconds: {}",
getSubscriptionLogManagerWindowSeconds());
+ LOGGER.info(
+ "SubscriptionLogManagerBaseIntervalMs: {}",
getSubscriptionLogManagerBaseIntervalMs());
LOGGER.info("SubscriptionPrefetchEnabled: {}",
getSubscriptionPrefetchEnabled());
LOGGER.info(