Copilot commented on code in PR #24927: URL: https://github.com/apache/pulsar/pull/24927#discussion_r2483194517
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import java.util.NavigableSet; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; + +/** + * View object for a subscription that implements DelayedDeliveryTracker interface. + * This view forwards all operations to the topic-level manager while maintaining + * compatibility with existing dispatcher logic. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerView implements DelayedDeliveryTracker { + + private final InMemoryTopicDelayedDeliveryTrackerManager manager; + private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext; + private boolean closed = false; + + public InMemoryTopicDelayedDeliveryTrackerView(InMemoryTopicDelayedDeliveryTrackerManager manager, + InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) { + this.manager = manager; + this.subContext = subContext; + } + + @Override + public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { + checkClosed(); + return manager.addMessageForSub(subContext, ledgerId, entryId, deliveryAt); + } + + @Override + public boolean hasMessageAvailable() { + checkClosed(); + return manager.hasMessageAvailableForSub(subContext); + } + + @Override + public long getNumberOfDelayedMessages() { + checkClosed(); + // Return an estimate of visible delayed messages for this subscription + // For now, return the total count - could be enhanced to count only visible messages Review Comment: The comment indicates this is returning an estimate and acknowledges the limitation, but the method name `getNumberOfDelayedMessages()` suggests an exact count per subscription. This discrepancy could mislead API consumers. Consider updating the documentation or logging a warning about this behavior difference from the legacy implementation. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Fixed-delay detection (parity with legacy behavior) + private volatile long highestDeliveryTimeTracked = 0; + private volatile boolean messagesHaveFixedDelay = true; + + // Timestamp precision for memory optimization + private int timestampPrecisionBitCnt; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private Position markDeletePosition; + + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); + this.onEmptyCallback = onEmptyCallback; + } + + private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { + int bitCnt = 0; + while (tickTimeMillis > 0) { + tickTimeMillis >>= 1; + bitCnt++; + } + return bitCnt > 0 ? bitCnt - 1 : 0; + } + + private static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); + } + + @Override + public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, clock)); + return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + subscriptionContexts.remove(subscriptionName); + // If no more subscriptions, proactively free index and release memory + if (subscriptionContexts.isEmpty()) { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + if (onEmptyCallback != null) { + try { + onEmptyCallback.run(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("onEmptyCallback failed", t); + } + } + } + } + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Update precision bits for new tick time (accept old/new buckets co-exist) + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(newTickTimeMillis); + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); + } + } + + @Override + public long topicBufferMemoryBytes() { + return bufferMemoryBytes.get(); + } + + @Override + public long topicDelayedMessages() { + return delayedMessagesCount.get(); + } + + @Override + public void close() { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + subscriptionContexts.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + } + + // Internal methods for subscription views + + /** + * Add a message to the global delayed message index. + */ + boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, long deliverAt) { + if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) { + return false; + } + + long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); + ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new ReentrantLock()); + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = + delayedMessageMap.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>()); + Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + long before = entryIds.getLongSizeInBytes(); + if (!entryIds.contains(entryId)) { + entryIds.add(entryId); + delayedMessagesCount.incrementAndGet(); + long after = entryIds.getLongSizeInBytes(); + bufferMemoryBytes.addAndGet(after - before); + } + } finally { + bLock.unlock(); + } + + // Timer update and fixed delay detection + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + checkAndUpdateHighest(deliverAt); + return true; + } + + private void checkAndUpdateHighest(long deliverAt) { + if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt); + } + + /** + * Check if there are messages available for a subscription. + */ + boolean hasMessageAvailableForSub(SubContext subContext) { + if (delayedMessageMap.isEmpty()) { + return false; + } + Long firstKey = delayedMessageMap.firstKey(); + if (firstKey == null) { + return false; + } + long cutoffTime = subContext.getCutoffTime(); + return firstKey <= cutoffTime; + } + + /** + * Get scheduled messages for a subscription. + */ + NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, int maxMessages) { + NavigableSet<Position> positions = new TreeSet<>(); + int remaining = maxMessages; + + // Refresh mark-delete once outside of any bucket lock + refreshMarkDeletePosition(subContext); + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + + // Snapshot of buckets up to cutoff and iterate per-bucket with bucket locks + java.util.List<Long> tsList = new java.util.ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet()); + for (Long ts : tsList) { + if (remaining <= 0) { + break; + } + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : ledgerMap.entrySet()) { + if (remaining <= 0) { + break; + } + long ledgerId = ledgerEntry.getKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + org.roaringbitmap.longlong.LongIterator it = entryIds.getLongIterator(); + while (it.hasNext() && remaining > 0) { + long entryId = it.next(); + if (markDelete != null && ledgerId == markDelete.getLedgerId() + && entryId <= markDelete.getEntryId()) { + continue; + } + positions.add(PositionFactory.create(ledgerId, entryId)); + remaining--; + } + } + } finally { + bLock.unlock(); + } + } + + // Prune global index based on min mark-delete across all subscriptions (write path) + if (!positions.isEmpty()) { + pruneByMinMarkDelete(); + } + + return positions; + } + + /** + * Check if deliveries should be paused for a subscription. + */ + boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { + // Parity with legacy: pause if all observed delays are fixed and backlog is large enough + return subContext.getFixedDelayDetectionLookahead() > 0 + && messagesHaveFixedDelay + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= subContext.getFixedDelayDetectionLookahead() + && !hasMessageAvailableForSub(subContext); + } + + /** + * Clear delayed messages for a subscription (no-op for topic-level manager). + */ + void clearForSub() { + // No-op: we don't clear global index for individual subscriptions + } + + /** + * Update mark delete position for a subscription. + */ + void updateMarkDeletePosition(SubContext subContext, Position position) { + subContext.updateMarkDeletePosition(position); + pruneByMinMarkDelete(); + } + + // Private helper methods + + private void updateTimerLocked() { + if (delayedMessageMap.isEmpty()) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + Long nextKey = delayedMessageMap.firstKey(); + if (nextKey == null) { + return; + } + long nextDeliveryTime = nextKey; + if (nextDeliveryTime == currentTimeoutTarget) { + return; + } + if (timeout != null) { + timeout.cancel(); + } + long now = clock.millis(); + long delayMillis = nextDeliveryTime - now; + if (delayMillis < 0) { + return; + } + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + currentTimeoutTarget = nextDeliveryTime; + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); + } + + private void updateBufferMemoryEstimate() { + // No-op in incremental mode (kept for compatibility) + } + + private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { + // Simplified implementation - returns total count + // Could be enhanced to count only messages visible to this subscription + return delayedMessagesCount.get(); + } + + private void pruneByMinMarkDelete() { + // Find the minimum mark delete position across all subscriptions + Position minMarkDelete = null; + for (SubContext subContext : subscriptionContexts.values()) { + Position markDelete = subContext.getMarkDeletePosition(); + if (markDelete != null) { + if (minMarkDelete == null || markDelete.compareTo(minMarkDelete) < 0) { + minMarkDelete = markDelete; + } + } + } + + if (minMarkDelete == null) { + return; + } + + // No idempotency set to clean (Option A): rely on per-bitmap removal below + + // Prune per bucket under bucket lock + for (Long ts : new ArrayList<>(delayedMessageMap.keySet())) { + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + ArrayList<Long> ledgersToRemove = new ArrayList<>(); + for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : ledgerMap.entrySet()) { + long ledgerId = ledgerEntry.getKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (ledgerId < minMarkDelete.getLedgerId()) { + long bytes = entryIds.getLongSizeInBytes(); + delayedMessagesCount.addAndGet(-entryIds.getLongCardinality()); + bufferMemoryBytes.addAndGet(-bytes); + ledgersToRemove.add(ledgerId); + } else if (ledgerId == minMarkDelete.getLedgerId()) { + long before = entryIds.getLongSizeInBytes(); + long removedCount = 0; + org.roaringbitmap.longlong.LongIterator it = entryIds.getLongIterator(); + java.util.ArrayList<Long> toRemove = new java.util.ArrayList<>(); + while (it.hasNext()) { + long e = it.next(); + if (e <= minMarkDelete.getEntryId()) { + toRemove.add(e); + } + } + for (Long e : toRemove) { + entryIds.removeLong(e); + removedCount++; + } + long after = entryIds.getLongSizeInBytes(); + delayedMessagesCount.addAndGet(-removedCount); + bufferMemoryBytes.addAndGet(after - before); + if (entryIds.isEmpty()) { + ledgersToRemove.add(ledgerId); + } + } + } + for (Long ledgerId : ledgersToRemove) { + ledgerMap.remove(ledgerId); + } + if (ledgerMap.isEmpty()) { + delayedMessageMap.remove(ts); + bucketLocks.remove(ts); + } + } finally { + bLock.unlock(); + } + } + } + + // idempotency set removed per Option A + Review Comment: This orphaned comment references 'Option A' without context, making it unclear to future maintainers. Either remove the comment or expand it to explain what 'Option A' was and why the idempotency set was removed. ```suggestion ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Fixed-delay detection (parity with legacy behavior) + private volatile long highestDeliveryTimeTracked = 0; + private volatile boolean messagesHaveFixedDelay = true; + + // Timestamp precision for memory optimization + private int timestampPrecisionBitCnt; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); Review Comment: The `bucketLocks` map grows unbounded as new timestamp buckets are created but is only cleaned up during pruning. If timestamps are diverse or pruning is delayed, this could lead to memory leaks. Consider cleaning up bucket locks when the corresponding timestamp bucket is removed from `delayedMessageMap` (e.g., in `pruneByMinMarkDelete` after line 518). ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java: ########## @@ -66,13 +72,35 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon } @VisibleForTesting - InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) { - return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, - isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead); + DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String topicName = dispatcher.getTopic().getName(); + + // Get or create topic-level manager for this topic with onEmpty callback to remove from cache + final TopicDelayedDeliveryTrackerManager[] holder = new TopicDelayedDeliveryTrackerManager[1]; + TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, k -> { + InMemoryTopicDelayedDeliveryTrackerManager m = new InMemoryTopicDelayedDeliveryTrackerManager( + timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, () -> topicManagers.remove(topicName, holder[0])); + holder[0] = m; + return m; + }); Review Comment: This pattern using a holder array to capture the manager reference for the onEmpty callback has a race condition. If `computeIfAbsent` returns an existing manager instead of creating a new one, `holder[0]` remains null, and the callback in the existing manager will reference the wrong object. Consider passing the topic name to the callback instead: `() -> topicManagers.remove(topicName)`. ```suggestion TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, k -> new InMemoryTopicDelayedDeliveryTrackerManager( timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, () -> topicManagers.remove(topicName) ) ); ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Fixed-delay detection (parity with legacy behavior) + private volatile long highestDeliveryTimeTracked = 0; + private volatile boolean messagesHaveFixedDelay = true; + + // Timestamp precision for memory optimization + private int timestampPrecisionBitCnt; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private Position markDeletePosition; + + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); + this.onEmptyCallback = onEmptyCallback; + } + + private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { + int bitCnt = 0; + while (tickTimeMillis > 0) { + tickTimeMillis >>= 1; + bitCnt++; + } + return bitCnt > 0 ? bitCnt - 1 : 0; + } + + private static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); + } + + @Override + public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, clock)); + return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + subscriptionContexts.remove(subscriptionName); + // If no more subscriptions, proactively free index and release memory + if (subscriptionContexts.isEmpty()) { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + if (onEmptyCallback != null) { + try { + onEmptyCallback.run(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("onEmptyCallback failed", t); + } + } + } + } + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Update precision bits for new tick time (accept old/new buckets co-exist) + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(newTickTimeMillis); + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); + } + } + + @Override + public long topicBufferMemoryBytes() { + return bufferMemoryBytes.get(); + } + + @Override + public long topicDelayedMessages() { + return delayedMessagesCount.get(); + } + + @Override + public void close() { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + subscriptionContexts.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + } + + // Internal methods for subscription views + + /** + * Add a message to the global delayed message index. + */ + boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, long deliverAt) { + if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) { + return false; + } + + long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); + ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new ReentrantLock()); + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = + delayedMessageMap.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>()); + Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + long before = entryIds.getLongSizeInBytes(); + if (!entryIds.contains(entryId)) { + entryIds.add(entryId); + delayedMessagesCount.incrementAndGet(); + long after = entryIds.getLongSizeInBytes(); + bufferMemoryBytes.addAndGet(after - before); + } + } finally { + bLock.unlock(); + } + + // Timer update and fixed delay detection + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + checkAndUpdateHighest(deliverAt); + return true; + } + + private void checkAndUpdateHighest(long deliverAt) { + if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt); + } + + /** + * Check if there are messages available for a subscription. + */ + boolean hasMessageAvailableForSub(SubContext subContext) { + if (delayedMessageMap.isEmpty()) { + return false; + } + Long firstKey = delayedMessageMap.firstKey(); + if (firstKey == null) { + return false; + } + long cutoffTime = subContext.getCutoffTime(); + return firstKey <= cutoffTime; + } + + /** + * Get scheduled messages for a subscription. + */ + NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, int maxMessages) { + NavigableSet<Position> positions = new TreeSet<>(); + int remaining = maxMessages; + + // Refresh mark-delete once outside of any bucket lock + refreshMarkDeletePosition(subContext); + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + + // Snapshot of buckets up to cutoff and iterate per-bucket with bucket locks + java.util.List<Long> tsList = new java.util.ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet()); + for (Long ts : tsList) { + if (remaining <= 0) { + break; + } + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : ledgerMap.entrySet()) { + if (remaining <= 0) { + break; + } + long ledgerId = ledgerEntry.getKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + org.roaringbitmap.longlong.LongIterator it = entryIds.getLongIterator(); + while (it.hasNext() && remaining > 0) { + long entryId = it.next(); + if (markDelete != null && ledgerId == markDelete.getLedgerId() + && entryId <= markDelete.getEntryId()) { + continue; + } + positions.add(PositionFactory.create(ledgerId, entryId)); + remaining--; + } + } + } finally { + bLock.unlock(); + } + } + + // Prune global index based on min mark-delete across all subscriptions (write path) + if (!positions.isEmpty()) { + pruneByMinMarkDelete(); + } + + return positions; + } + + /** + * Check if deliveries should be paused for a subscription. + */ + boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { + // Parity with legacy: pause if all observed delays are fixed and backlog is large enough + return subContext.getFixedDelayDetectionLookahead() > 0 + && messagesHaveFixedDelay + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= subContext.getFixedDelayDetectionLookahead() + && !hasMessageAvailableForSub(subContext); + } + + /** + * Clear delayed messages for a subscription (no-op for topic-level manager). + */ + void clearForSub() { + // No-op: we don't clear global index for individual subscriptions + } + + /** + * Update mark delete position for a subscription. + */ + void updateMarkDeletePosition(SubContext subContext, Position position) { + subContext.updateMarkDeletePosition(position); + pruneByMinMarkDelete(); + } + + // Private helper methods + + private void updateTimerLocked() { + if (delayedMessageMap.isEmpty()) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + Long nextKey = delayedMessageMap.firstKey(); + if (nextKey == null) { + return; + } + long nextDeliveryTime = nextKey; + if (nextDeliveryTime == currentTimeoutTarget) { + return; + } + if (timeout != null) { + timeout.cancel(); + } + long now = clock.millis(); + long delayMillis = nextDeliveryTime - now; + if (delayMillis < 0) { + return; + } + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + currentTimeoutTarget = nextDeliveryTime; + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); + } + + private void updateBufferMemoryEstimate() { + // No-op in incremental mode (kept for compatibility) + } + Review Comment: This empty method serves no purpose and adds confusion. Since it's never called and the comment indicates it's kept for compatibility, it should be removed to improve code clarity. ```suggestion ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java: ########## @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap<Long, ConcurrentHashMap<Long, Roaring64Bitmap>> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap<String, SubContext> subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Fixed-delay detection (parity with legacy behavior) + private volatile long highestDeliveryTimeTracked = 0; + private volatile boolean messagesHaveFixedDelay = true; + + // Timestamp precision for memory optimization + private int timestampPrecisionBitCnt; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap<Long, ReentrantLock> bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private Position markDeletePosition; + + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); + this.onEmptyCallback = onEmptyCallback; + } + + private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { + int bitCnt = 0; + while (tickTimeMillis > 0) { + tickTimeMillis >>= 1; + bitCnt++; + } + return bitCnt > 0 ? bitCnt - 1 : 0; + } + + private static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); + } + + @Override + public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, clock)); + return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + subscriptionContexts.remove(subscriptionName); + // If no more subscriptions, proactively free index and release memory + if (subscriptionContexts.isEmpty()) { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + if (onEmptyCallback != null) { + try { + onEmptyCallback.run(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("onEmptyCallback failed", t); + } + } + } + } + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Update precision bits for new tick time (accept old/new buckets co-exist) + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(newTickTimeMillis); + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); + } + } + + @Override + public long topicBufferMemoryBytes() { + return bufferMemoryBytes.get(); + } + + @Override + public long topicDelayedMessages() { + return delayedMessagesCount.get(); + } + + @Override + public void close() { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + subscriptionContexts.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + } + + // Internal methods for subscription views + + /** + * Add a message to the global delayed message index. + */ + boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, long deliverAt) { + if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) { + return false; + } + + long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); + ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new ReentrantLock()); + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = + delayedMessageMap.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>()); + Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + long before = entryIds.getLongSizeInBytes(); + if (!entryIds.contains(entryId)) { + entryIds.add(entryId); + delayedMessagesCount.incrementAndGet(); + long after = entryIds.getLongSizeInBytes(); + bufferMemoryBytes.addAndGet(after - before); + } + } finally { + bLock.unlock(); + } + + // Timer update and fixed delay detection + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + checkAndUpdateHighest(deliverAt); + return true; + } + + private void checkAndUpdateHighest(long deliverAt) { + if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt); + } + + /** + * Check if there are messages available for a subscription. + */ + boolean hasMessageAvailableForSub(SubContext subContext) { + if (delayedMessageMap.isEmpty()) { + return false; + } + Long firstKey = delayedMessageMap.firstKey(); + if (firstKey == null) { + return false; + } + long cutoffTime = subContext.getCutoffTime(); + return firstKey <= cutoffTime; + } + + /** + * Get scheduled messages for a subscription. + */ + NavigableSet<Position> getScheduledMessagesForSub(SubContext subContext, int maxMessages) { + NavigableSet<Position> positions = new TreeSet<>(); + int remaining = maxMessages; + + // Refresh mark-delete once outside of any bucket lock + refreshMarkDeletePosition(subContext); + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + + // Snapshot of buckets up to cutoff and iterate per-bucket with bucket locks + java.util.List<Long> tsList = new java.util.ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet()); + for (Long ts : tsList) { + if (remaining <= 0) { + break; + } + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : ledgerMap.entrySet()) { + if (remaining <= 0) { + break; + } + long ledgerId = ledgerEntry.getKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + org.roaringbitmap.longlong.LongIterator it = entryIds.getLongIterator(); + while (it.hasNext() && remaining > 0) { + long entryId = it.next(); + if (markDelete != null && ledgerId == markDelete.getLedgerId() + && entryId <= markDelete.getEntryId()) { + continue; + } + positions.add(PositionFactory.create(ledgerId, entryId)); + remaining--; + } + } + } finally { + bLock.unlock(); + } + } + + // Prune global index based on min mark-delete across all subscriptions (write path) + if (!positions.isEmpty()) { + pruneByMinMarkDelete(); + } + + return positions; + } + + /** + * Check if deliveries should be paused for a subscription. + */ + boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { + // Parity with legacy: pause if all observed delays are fixed and backlog is large enough + return subContext.getFixedDelayDetectionLookahead() > 0 + && messagesHaveFixedDelay + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= subContext.getFixedDelayDetectionLookahead() + && !hasMessageAvailableForSub(subContext); + } + + /** + * Clear delayed messages for a subscription (no-op for topic-level manager). + */ + void clearForSub() { + // No-op: we don't clear global index for individual subscriptions + } + + /** + * Update mark delete position for a subscription. + */ + void updateMarkDeletePosition(SubContext subContext, Position position) { + subContext.updateMarkDeletePosition(position); + pruneByMinMarkDelete(); + } + + // Private helper methods + + private void updateTimerLocked() { + if (delayedMessageMap.isEmpty()) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + Long nextKey = delayedMessageMap.firstKey(); + if (nextKey == null) { + return; + } + long nextDeliveryTime = nextKey; + if (nextDeliveryTime == currentTimeoutTarget) { + return; + } + if (timeout != null) { + timeout.cancel(); + } + long now = clock.millis(); + long delayMillis = nextDeliveryTime - now; + if (delayMillis < 0) { + return; + } + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + currentTimeoutTarget = nextDeliveryTime; + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); + } + + private void updateBufferMemoryEstimate() { + // No-op in incremental mode (kept for compatibility) + } + + private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { + // Simplified implementation - returns total count + // Could be enhanced to count only messages visible to this subscription + return delayedMessagesCount.get(); + } + + private void pruneByMinMarkDelete() { + // Find the minimum mark delete position across all subscriptions + Position minMarkDelete = null; + for (SubContext subContext : subscriptionContexts.values()) { + Position markDelete = subContext.getMarkDeletePosition(); + if (markDelete != null) { + if (minMarkDelete == null || markDelete.compareTo(minMarkDelete) < 0) { + minMarkDelete = markDelete; + } + } + } + + if (minMarkDelete == null) { + return; + } + + // No idempotency set to clean (Option A): rely on per-bitmap removal below + + // Prune per bucket under bucket lock + for (Long ts : new ArrayList<>(delayedMessageMap.keySet())) { + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + ConcurrentHashMap<Long, Roaring64Bitmap> ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + ArrayList<Long> ledgersToRemove = new ArrayList<>(); + for (Map.Entry<Long, Roaring64Bitmap> ledgerEntry : ledgerMap.entrySet()) { + long ledgerId = ledgerEntry.getKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (ledgerId < minMarkDelete.getLedgerId()) { + long bytes = entryIds.getLongSizeInBytes(); + delayedMessagesCount.addAndGet(-entryIds.getLongCardinality()); + bufferMemoryBytes.addAndGet(-bytes); + ledgersToRemove.add(ledgerId); + } else if (ledgerId == minMarkDelete.getLedgerId()) { + long before = entryIds.getLongSizeInBytes(); + long removedCount = 0; + org.roaringbitmap.longlong.LongIterator it = entryIds.getLongIterator(); + java.util.ArrayList<Long> toRemove = new java.util.ArrayList<>(); Review Comment: Using fully qualified class names (`java.util.ArrayList`) instead of imports reduces code readability. Since `ArrayList` is already imported at line 25, use the short form here. ```suggestion ArrayList<Long> toRemove = new ArrayList<>(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
