Copilot commented on code in PR #24922:
URL: https://github.com/apache/pulsar/pull/24922#discussion_r2477926013


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java:
##########
@@ -66,13 +71,30 @@ public DelayedDeliveryTracker 
newTracker(AbstractPersistentDispatcherMultipleCon
     }
 
     @VisibleForTesting
-    InMemoryDelayedDeliveryTracker 
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
-        return new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
tickTimeMillis,
-                isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead);
+    DelayedDeliveryTracker 
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String topicName = dispatcher.getTopic().getName();
+
+        // Get or create topic-level manager for this topic
+        TopicDelayedDeliveryTrackerManager manager = 
topicManagers.computeIfAbsent(topicName,
+            k -> new InMemoryTopicDelayedDeliveryTrackerManager(timer, 
tickTimeMillis,
+                    isDelayedDeliveryDeliverAtTimeStrict, 
fixedDelayDetectionLookahead));

Review Comment:
   The `topicManagers` map could accumulate stale entries for topics that have 
been unloaded. When a topic is unloaded and all subscriptions are closed, the 
manager should be removed from the map. Consider adding cleanup logic in 
`InMemoryTopicDelayedDeliveryTrackerManager.unregister()` that notifies the 
factory to remove the manager from the cache, or add a callback mechanism for 
the manager to remove itself when it closes.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import java.util.NavigableSet;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+
+/**
+ * View object for a subscription that implements DelayedDeliveryTracker 
interface.
+ * This view forwards all operations to the topic-level manager while 
maintaining
+ * compatibility with existing dispatcher logic.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerView implements 
DelayedDeliveryTracker {
+
+    private final InMemoryTopicDelayedDeliveryTrackerManager manager;
+    private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext 
subContext;
+    private boolean closed = false;
+
+    public 
InMemoryTopicDelayedDeliveryTrackerView(InMemoryTopicDelayedDeliveryTrackerManager
 manager,
+                                                   
InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) {
+        this.manager = manager;
+        this.subContext = subContext;
+    }
+
+    @Override
+    public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
+        checkClosed();
+        return manager.addMessageForSub(subContext, ledgerId, entryId, 
deliveryAt);
+    }
+
+    @Override
+    public boolean hasMessageAvailable() {
+        checkClosed();
+        return manager.hasMessageAvailableForSub(subContext);
+    }
+
+    @Override
+    public long getNumberOfDelayedMessages() {
+        checkClosed();
+        // Return an estimate of visible delayed messages for this subscription
+        // For now, return the total count - could be enhanced to count only 
visible messages
+        return manager.topicDelayedMessages();
+    }
+
+    @Override
+    public long getBufferMemoryUsage() {
+        checkClosed();
+        // Return the topic-level memory usage (shared by all subscriptions)
+        return manager.topicBufferMemoryBytes();
+    }
+
+    @Override
+    public NavigableSet<Position> getScheduledMessages(int maxMessages) {
+        checkClosed();
+        return manager.getScheduledMessagesForSub(subContext, maxMessages);
+    }
+
+    @Override
+    public boolean shouldPauseAllDeliveries() {
+        checkClosed();
+        return manager.shouldPauseAllDeliveriesForSub(subContext);
+    }
+
+    @Override
+    public void resetTickTime(long tickTime) {
+        checkClosed();
+        manager.onTickTimeUpdated(tickTime);
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        checkClosed();
+        // For topic-level manager, clear is a no-op for individual 
subscriptions
+        manager.clearForSub();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        manager.unregister(subContext.getDispatcher());
+    }
+
+    /**
+     * Update the mark delete position for this subscription.
+     * This is called by the dispatcher when messages are acknowledged.
+     */
+    public void updateMarkDeletePosition(Position position) {
+        checkClosed();
+        manager.updateMarkDeletePosition(subContext, position);
+    }

Review Comment:
   The `updateMarkDeletePosition` method is public but not part of the 
`DelayedDeliveryTracker` interface. This creates an API inconsistency where 
callers need to cast to the concrete type to call this method. Consider either 
adding this method to the interface or finding an alternative design pattern 
(such as having the dispatcher directly notify the manager) to avoid breaking 
the interface abstraction.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    private final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>> 
delayedMessageMap =
+            new Long2ObjectRBTreeMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final Map<String, SubContext> subscriptionContexts = new 
HashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+
+    // Configuration
+    private final long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Timestamp precision for memory optimization
+    private final int timestampPrecisionBitCnt;
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private final long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            return isDelayedDeliveryDeliverAtTimeStrict ? 
System.currentTimeMillis() :
+                   System.currentTimeMillis() + tickTimeMillis;
+        }
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+             fixedDelayDetectionLookahead);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+    }
+
+    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+        int bitCnt = 0;
+        while (tickTimeMillis > 0) {
+            tickTimeMillis >>= 1;
+            bitCnt++;
+        }
+        return bitCnt > 0 ? bitCnt - 1 : 0;
+    }
+
+    private static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        synchronized (this) {
+            SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                                   fixedDelayDetectionLookahead));
+
+            return new InMemoryTopicDelayedDeliveryTrackerView(this, 
subContext);
+        }
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        synchronized (this) {
+            subscriptionContexts.remove(subscriptionName);
+
+            // If no more subscriptions, close the manager
+            if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty()) 
{
+                close();
+            }
+        }
+    }
+
+    @Override
+    public void onTickTimeUpdated(long newTickTimeMillis) {
+        // For now, tick time updates are not supported after initialization
+        // This could be enhanced to update all subscription contexts
+        log.warn("Tick time updates are not currently supported for 
topic-level delayed delivery managers");

Review Comment:
   Logging a warning every time `onTickTimeUpdated` is called could spam logs 
if this method is invoked frequently. Consider either implementing the 
functionality, using a rate-limited logger, or throwing an 
UnsupportedOperationException to clearly indicate this is not supported rather 
than silently logging warnings.
   ```suggestion
           throw new UnsupportedOperationException("Tick time updates are not 
currently supported for topic-level delayed delivery managers");
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    private final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>> 
delayedMessageMap =
+            new Long2ObjectRBTreeMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final Map<String, SubContext> subscriptionContexts = new 
HashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+
+    // Configuration
+    private final long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Timestamp precision for memory optimization
+    private final int timestampPrecisionBitCnt;
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private final long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            return isDelayedDeliveryDeliverAtTimeStrict ? 
System.currentTimeMillis() :
+                   System.currentTimeMillis() + tickTimeMillis;
+        }
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+             fixedDelayDetectionLookahead);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+    }
+
+    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+        int bitCnt = 0;
+        while (tickTimeMillis > 0) {
+            tickTimeMillis >>= 1;
+            bitCnt++;
+        }
+        return bitCnt > 0 ? bitCnt - 1 : 0;
+    }
+
+    private static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        synchronized (this) {
+            SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                                   fixedDelayDetectionLookahead));
+
+            return new InMemoryTopicDelayedDeliveryTrackerView(this, 
subContext);
+        }
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        synchronized (this) {
+            subscriptionContexts.remove(subscriptionName);
+
+            // If no more subscriptions, close the manager
+            if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty()) 
{
+                close();
+            }
+        }

Review Comment:
   Calling `close()` within `unregister()` while already holding the lock could 
lead to maintenance issues. The `close()` method also acquires the same lock 
(line 180), which is reentrant but adds complexity. Consider either documenting 
this self-closing behavior clearly or extracting the cleanup logic to avoid 
nested locking patterns.
   ```suggestion
           boolean shouldClose = false;
           synchronized (this) {
               subscriptionContexts.remove(subscriptionName);
   
               // If no more subscriptions, close the manager
               if (subscriptionContexts.isEmpty() && 
delayedMessageMap.isEmpty()) {
                   shouldClose = true;
               }
           }
           if (shouldClose) {
               close();
           }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    private final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>> 
delayedMessageMap =
+            new Long2ObjectRBTreeMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final Map<String, SubContext> subscriptionContexts = new 
HashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+
+    // Configuration
+    private final long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Timestamp precision for memory optimization
+    private final int timestampPrecisionBitCnt;
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private final long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private Position markDeletePosition;
+
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            return isDelayedDeliveryDeliverAtTimeStrict ? 
System.currentTimeMillis() :
+                   System.currentTimeMillis() + tickTimeMillis;
+        }
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, Clock.systemUTC(), 
isDelayedDeliveryDeliverAtTimeStrict,
+             fixedDelayDetectionLookahead);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+    }
+
+    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+        int bitCnt = 0;
+        while (tickTimeMillis > 0) {
+            tickTimeMillis >>= 1;
+            bitCnt++;
+        }
+        return bitCnt > 0 ? bitCnt - 1 : 0;
+    }
+
+    private static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        synchronized (this) {
+            SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                                   fixedDelayDetectionLookahead));
+
+            return new InMemoryTopicDelayedDeliveryTrackerView(this, 
subContext);

Review Comment:
   The `createOrGetView` method always creates a new 
`InMemoryTopicDelayedDeliveryTrackerView` instance even when the `SubContext` 
already exists. This means multiple view instances could be created for the 
same subscription, each holding a reference to the same `SubContext`. When 
multiple views are closed, the `unregister` method will remove the subscription 
context prematurely, breaking other view instances. The method should track and 
return existing view instances or document that only one view per subscription 
should be active at a time.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java:
##########
@@ -223,11 +223,11 @@ public void 
testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed()
         });
 
         Optional<DelayedDeliveryTracker> optional = reference.get();
-        Assert.assertTrue(optional.get() instanceof 
InMemoryDelayedDeliveryTracker);
+        Assert.assertTrue(optional.get() instanceof 
InMemoryTopicDelayedDeliveryTrackerView);
 
         // Mock DelayedDeliveryTracker and Count the number of addMessage() 
calls
         AtomicInteger counter = new AtomicInteger(0);
-        InMemoryDelayedDeliveryTracker tracker = 
(InMemoryDelayedDeliveryTracker) optional.get();
+        InMemoryTopicDelayedDeliveryTrackerView tracker = 
(InMemoryTopicDelayedDeliveryTrackerView) optional.get();
         tracker =  Mockito.spy(tracker);

Review Comment:
   Double space between `=` and `Mockito.spy(tracker)`. Should be a single 
space for consistency with code formatting standards.
   ```suggestion
           tracker = Mockito.spy(tracker);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to