Copilot commented on code in PR #24927:
URL: https://github.com/apache/pulsar/pull/24927#discussion_r2484219126


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -370,7 +370,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(category = CATEGORY_SERVER, doc = """
             Class name of the factory that implements the delayed deliver 
tracker.
             If value is 
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", \
-            will create bucket based delayed message index tracker.
+            will create bucket based delayed message index tracker.\n
+            If value is 
"org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory", \
+            will create topic-level in-memory delayed message index tracker.\n
+            If value is 
"org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory", \
+            will create in-memory delayed delivery tracker (per existing 
implementation).

Review Comment:
   The documentation should clarify the key difference between these two 
implementations: InMemoryTopicDelayedDeliveryTrackerFactory shares a single 
index across all subscriptions on a topic (reducing memory usage in 
multi-subscription scenarios), while InMemoryDelayedDeliveryTrackerFactory 
maintains separate indexes per subscription. This distinction is important for 
users choosing between the implementations.



##########
conf/standalone.conf:
##########
@@ -1422,6 +1422,8 @@ delayedDeliveryEnabled=true
 # Class name of the factory that implements the delayed deliver tracker.
 # If value is 
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
 # will create bucket based delayed message index tracker.
+# If value is 
"org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory",
+# will use topic-level in-memory delayed message index tracker (alias of the 
default in-memory implementation).

Review Comment:
   The comment incorrectly states that 
InMemoryTopicDelayedDeliveryTrackerFactory is an 'alias of the default 
in-memory implementation'. This is misleading—it's actually a distinct 
implementation that uses topic-level index sharing, not just an alias. The 
comment should clarify that this is a different implementation with shared 
topic-level indexing for reduced memory usage in multi-subscription scenarios.
   ```suggestion
   # will use topic-level in-memory delayed message index tracker (distinct 
from the default in-memory implementation; uses shared topic-level indexing for 
reduced memory usage in multi-subscription scenarios).
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactory.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Clock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InMemoryTopicDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrackerFactory {
+
+    private static final Logger log = 
LoggerFactory.getLogger(InMemoryTopicDelayedDeliveryTrackerFactory.class);
+
+    private Timer timer;
+
+    private long tickTimeMillis;
+
+    private boolean isDelayedDeliveryDeliverAtTimeStrict;
+
+    private long fixedDelayDetectionLookahead;
+
+    // New tuning knobs
+    private long pruneMinIntervalMillis;
+    private double pruneEligibleRatio;
+    private long topicManagerIdleMillis;
+
+    // Cache of topic-level managers: topic name -> manager instance
+    private final ConcurrentMap<String, TopicDelayedDeliveryTrackerManager> 
topicManagers = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    InMemoryTopicDelayedDeliveryTrackerFactory(Timer timer, long 
tickTimeMillis,
+                                          boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                          long fixedDelayDetectionLookahead) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.pruneMinIntervalMillis = 0;
+        this.pruneEligibleRatio = 0.5;
+        this.topicManagerIdleMillis = 0;
+    }
+
+    @VisibleForTesting
+    int getCachedManagersSize() {
+        return topicManagers.size();
+    }
+
+    @VisibleForTesting
+    boolean hasManagerForTopic(String topicName) {
+        return topicManagers.containsKey(topicName);
+    }
+
+    @Override
+    public void initialize(PulsarService pulsarService) {
+        ServiceConfiguration config = pulsarService.getConfig();
+        this.timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-delayed-delivery"),
+                config.getDelayedDeliveryTickTimeMillis(), 
TimeUnit.MILLISECONDS);
+        this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
config.isDelayedDeliveryDeliverAtTimeStrict();
+        this.fixedDelayDetectionLookahead = 
config.getDelayedDeliveryFixedDelayDetectionLookahead();
+        this.pruneMinIntervalMillis = 
config.getDelayedDeliveryPruneMinIntervalMillis();
+        this.pruneEligibleRatio = 
config.getDelayedDeliveryPruneEligibleRatio();
+        this.topicManagerIdleMillis = 
config.getDelayedDeliveryTopicManagerIdleMillis();
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String topicName = dispatcher.getTopic().getName();
+        String subscriptionName = dispatcher.getSubscription().getName();
+        DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
+        try {
+            tracker = newTracker0(dispatcher);
+        } catch (Exception e) {
+            // it should never go here
+            log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic 
{}, subscription {}",

Review Comment:
   The error message references 'InMemoryDelayedDeliveryTracker' but should 
reference 'InMemoryTopicDelayedDeliveryTracker' to accurately reflect the class 
being created in this factory.
   ```suggestion
               log.warn("Failed to create InMemoryTopicDelayedDeliveryTracker, 
topic {}, subscription {}",
   ```



##########
conf/broker.conf:
##########
@@ -654,6 +654,8 @@ delayedDeliveryEnabled=true
 # Class name of the factory that implements the delayed deliver tracker.
 # If value is 
"org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
 # will create bucket based delayed message index tracker.
+# If value is 
"org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory",
+# will use topic-level in-memory delayed message index tracker (alias of the 
default in-memory implementation).

Review Comment:
   The comment incorrectly states that 
InMemoryTopicDelayedDeliveryTrackerFactory is an 'alias of the default 
in-memory implementation'. This is misleading—it's actually a distinct 
implementation that uses topic-level index sharing, not just an alias. The 
comment should clarify that this is a different implementation with shared 
topic-level indexing for reduced memory usage in multi-subscription scenarios.
   ```suggestion
   # will use a topic-level in-memory delayed message index tracker (a distinct 
implementation that shares the index across subscriptions to reduce memory 
usage in multi-subscription scenarios).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to