Denovo1998 commented on code in PR #24927:
URL: https://github.com/apache/pulsar/pull/24927#discussion_r2484806334


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.LongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that 
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription 
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements 
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+    // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+    // Outer: sorted by timestamp for efficient finding of earliest bucket
+    // Inner: per-ledger bitmaps of entry-ids
+    private final ConcurrentSkipListMap<Long, 
Long2ObjectRBTreeMap<Roaring64Bitmap>> delayedMessageMap =
+            new ConcurrentSkipListMap<>();
+
+    // Subscription registry: subscription name -> subscription context
+    private final ConcurrentHashMap<String, SubContext> subscriptionContexts = 
new ConcurrentHashMap<>();
+
+    // Timer for delayed delivery
+    private final Timer timer;
+    private Timeout timeout;
+    private long currentTimeoutTarget = -1;
+    // Last time the TimerTask was triggered
+    private long lastTickRun = 0L;
+
+    // Configuration
+    private long tickTimeMillis;
+    private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+    private final long fixedDelayDetectionLookahead;
+    private final Clock clock;
+
+    // Statistics
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+    private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+    // Prune throttling
+    // Last pruning time
+    private final AtomicLong lastPruneNanos = new AtomicLong(0);
+    // Minimum interval between prunes
+    private final long minPruneIntervalNanos;
+
+    // Ratio of eligible subscriptions required to opportunistically prune 
[0.0, 1.0]
+    private final double pruneEligibleRatio;
+
+    // Fixed-delay detection (parity with legacy behavior)
+    private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0);
+    private volatile boolean messagesHaveFixedDelay = true;
+
+    // Per-bucket locks (timestamp -> lock) for fine-grained concurrency
+    private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new 
ConcurrentHashMap<>();
+
+    // Timer state guard
+    private final ReentrantLock timerLock = new ReentrantLock();
+
+    /**
+     * Subscription context that holds per-subscription state.
+     */
+    @Getter
+    static class SubContext {
+        private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+        private final String subscriptionName;
+        private volatile long tickTimeMillis;
+        private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+        private final long fixedDelayDetectionLookahead;
+        private final Clock clock;
+        private volatile Position markDeletePosition;
+
+        /**
+         * Constructs a new SubContext for a subscription.
+         *
+         * @param dispatcher the dispatcher associated with the subscription
+         * @param tickTimeMillis the tick interval in milliseconds for delayed 
delivery checks
+         * @param isDelayedDeliveryDeliverAtTimeStrict if true, delayed 
messages are delivered strictly at their
+         *                                            scheduled time; if 
false, messages may be delivered in the next
+         *                                            tick window
+         * @param fixedDelayDetectionLookahead the lookahead window (in 
milliseconds) used for
+         *                                    detecting fixed-delay messages
+         * @param clock the clock instance used for time calculations
+         */
+        SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, 
long tickTimeMillis,
+                   boolean isDelayedDeliveryDeliverAtTimeStrict, long 
fixedDelayDetectionLookahead,
+                   Clock clock) {
+            this.dispatcher = dispatcher;
+            this.subscriptionName = dispatcher.getSubscription().getName();
+            this.tickTimeMillis = tickTimeMillis;
+            this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+            this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+            this.clock = clock;
+        }
+
+        void updateMarkDeletePosition(Position position) {
+            this.markDeletePosition = position;
+        }
+
+        long getCutoffTime() {
+            long now = clock.millis();
+            return isDelayedDeliveryDeliverAtTimeStrict ? now : now + 
tickTimeMillis;
+        }
+    }
+
+    private final Runnable onEmptyCallback;
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead) {
+        this(timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead,
+                0, 0.5, null);
+    }
+
+    public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long 
tickTimeMillis, Clock clock,
+                                                      boolean 
isDelayedDeliveryDeliverAtTimeStrict,
+                                                      long 
fixedDelayDetectionLookahead,
+                                                      long 
pruneMinIntervalMillis,
+                                                      double 
pruneEligibleRatio,
+                                                      Runnable 
onEmptyCallback) {
+        this.timer = timer;
+        this.tickTimeMillis = tickTimeMillis;
+        this.clock = clock;
+        this.isDelayedDeliveryDeliverAtTimeStrict = 
isDelayedDeliveryDeliverAtTimeStrict;
+        this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.onEmptyCallback = onEmptyCallback;
+        // Prune throttle interval: use configured override if positive, 
otherwise adaptive clamp [5ms, 50ms]
+        long pruneMs = pruneMinIntervalMillis > 0
+                ? pruneMinIntervalMillis
+                : Math.max(5L, Math.min(50L, tickTimeMillis));
+        this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs);
+        // Prune eligible ratio: clamp into [0.0, 1.0]
+        if (Double.isNaN(pruneEligibleRatio)) {
+            pruneEligibleRatio = 0.5;
+        }
+        this.pruneEligibleRatio = Math.max(0.0, Math.min(1.0, 
pruneEligibleRatio));
+    }
+
+    // We bucket messages by aligning the deliverAt timestamp to the start of 
the logical tick window:
+    // bucketStart = deliverAt - (deliverAt % tickTimeMillis)
+    // If tickTimeMillis changes over time, the same message may land in 
different buckets when re-added
+    // by another subscription. Read paths dedup via TreeSet and counts 
include duplicates by design.
+    private long bucketStart(long timestamp) {
+        long t = this.tickTimeMillis;
+        if (t <= 0) {
+            return timestamp;
+        }
+        long mod = timestamp % t;
+        if (mod == 0) {
+            return timestamp;
+        }
+        return timestamp - mod;
+    }
+
+    @Override
+    public DelayedDeliveryTracker 
createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        SubContext subContext = 
subscriptionContexts.computeIfAbsent(subscriptionName,
+                k -> new SubContext(dispatcher, tickTimeMillis, 
isDelayedDeliveryDeliverAtTimeStrict,
+                        fixedDelayDetectionLookahead, clock));
+        return new InMemoryTopicDelayedDeliveryTracker(this, subContext);
+    }
+
+    @Override
+    public void unregister(AbstractPersistentDispatcherMultipleConsumers 
dispatcher) {
+        String subscriptionName = dispatcher.getSubscription().getName();
+
+        subscriptionContexts.remove(subscriptionName);
+        // If no more subscriptions, proactively free index and release memory
+        if (subscriptionContexts.isEmpty()) {
+            timerLock.lock();
+            try {
+                if (timeout != null) {
+                    timeout.cancel();
+                    timeout = null;
+                }
+                currentTimeoutTarget = -1;
+            } finally {
+                timerLock.unlock();
+            }
+            delayedMessageMap.clear();
+            bucketLocks.clear();
+            delayedMessagesCount.set(0);
+            bufferMemoryBytes.set(0);
+            if (onEmptyCallback != null) {
+                try {
+                    onEmptyCallback.run();
+                } catch (Throwable t) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("onEmptyCallback failed", t);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Whether there are active subscriptions registered with this manager.
+     */
+    public boolean hasActiveSubscriptions() {
+        return !subscriptionContexts.isEmpty();
+    }
+
+    @Override
+    public void onTickTimeUpdated(long newTickTimeMillis) {
+        if (this.tickTimeMillis == newTickTimeMillis) {
+            return;
+        }
+        this.tickTimeMillis = newTickTimeMillis;
+        // Propagate to all subscriptions
+        for (SubContext sc : subscriptionContexts.values()) {
+            sc.tickTimeMillis = newTickTimeMillis;
+        }
+        // Re-evaluate timer scheduling with new tick time
+        timerLock.lock();
+        try {
+            updateTimerLocked();
+        } finally {
+            timerLock.unlock();
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Updated tickTimeMillis for topic-level delayed delivery 
manager to {} ms", newTickTimeMillis);
+        }
+    }

Review Comment:
   @lhotari @codelipenghui @coderzc @Apurva007 @thetumbled @dao-jun 
@BewareMyPower 
   
   Currently, getScheduledMessagesForSub does not mark the returned position as 
"Scheduled for this subscription" nor does it delete it in the topic-level 
index (can only wait for unified pruning after all subscriptions mark-delete). 
As a result, before the ack, the next getScheduledMessagesForSub will still 
return the same position, causing duplicate scheduling and duplicate reading, 
which in severe cases can lead to duplicate delivery.
   
   The original per-sub InMemoryDelayedDeliveryTracker usually "takes out and 
removes" from the subscription structure when getScheduledMessages, avoiding 
duplicate scheduling; after the shared index, a per-sub "pending/scheduled but 
unconfirmed" collection or cursor is also needed, used to filter out the 
positions that have been scheduled but not confirmed when 
getScheduledMessagesForSub, and will not return these positions again before 
mark-delete.
   
   **This needs optimization, but it feels like there's no low-cost way to 
handle it.**
   
   1.Index bucketing uses a fixed `indexGranularityMillis` (can be the initial 
tick or a fixed constant of 10ms/1ms), no longer changes with resetTickTime; 
resetTickTime only affects the triggering frequency of the Timer.
   
   2.Deduplication on the read side. In SubContext, maintain a pending 
collection of ledgerId -> Roaring64Bitmap(entryId); get ScheduledMessagesForSub 
records the returned positions in pending, and subsequent calls filter pending; 
when mark-delete is advanced, remove from pending; Timer judgment also filters 
pending to avoid empty rotation.
   
   3.Block and rebuild delayedMessageMap here at onTickTimeUpdated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to