Copilot commented on code in PR #24922:
URL: https://github.com/apache/pulsar/pull/24922#discussion_r2477926013
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java:
##########
@@ -66,13 +71,30 @@ public DelayedDeliveryTracker
newTracker(AbstractPersistentDispatcherMultipleCon
}
@VisibleForTesting
- InMemoryDelayedDeliveryTracker
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
- return new InMemoryDelayedDeliveryTracker(dispatcher, timer,
tickTimeMillis,
- isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
+ DelayedDeliveryTracker
newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+ String topicName = dispatcher.getTopic().getName();
+
+ // Get or create topic-level manager for this topic
+ TopicDelayedDeliveryTrackerManager manager =
topicManagers.computeIfAbsent(topicName,
+ k -> new InMemoryTopicDelayedDeliveryTrackerManager(timer,
tickTimeMillis,
+ isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead));
Review Comment:
The `topicManagers` map could accumulate stale entries for topics that have
been unloaded. When a topic is unloaded and all subscriptions are closed, the
manager should be removed from the map. Consider adding cleanup logic in
`InMemoryTopicDelayedDeliveryTrackerManager.unregister()` that notifies the
factory to remove the manager from the cache, or add a callback mechanism for
the manager to remove itself when it closes.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import java.util.NavigableSet;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+
+/**
+ * View object for a subscription that implements DelayedDeliveryTracker
interface.
+ * This view forwards all operations to the topic-level manager while
maintaining
+ * compatibility with existing dispatcher logic.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerView implements
DelayedDeliveryTracker {
+
+ private final InMemoryTopicDelayedDeliveryTrackerManager manager;
+ private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext
subContext;
+ private boolean closed = false;
+
+ public
InMemoryTopicDelayedDeliveryTrackerView(InMemoryTopicDelayedDeliveryTrackerManager
manager,
+
InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) {
+ this.manager = manager;
+ this.subContext = subContext;
+ }
+
+ @Override
+ public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
+ checkClosed();
+ return manager.addMessageForSub(subContext, ledgerId, entryId,
deliveryAt);
+ }
+
+ @Override
+ public boolean hasMessageAvailable() {
+ checkClosed();
+ return manager.hasMessageAvailableForSub(subContext);
+ }
+
+ @Override
+ public long getNumberOfDelayedMessages() {
+ checkClosed();
+ // Return an estimate of visible delayed messages for this subscription
+ // For now, return the total count - could be enhanced to count only
visible messages
+ return manager.topicDelayedMessages();
+ }
+
+ @Override
+ public long getBufferMemoryUsage() {
+ checkClosed();
+ // Return the topic-level memory usage (shared by all subscriptions)
+ return manager.topicBufferMemoryBytes();
+ }
+
+ @Override
+ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
+ checkClosed();
+ return manager.getScheduledMessagesForSub(subContext, maxMessages);
+ }
+
+ @Override
+ public boolean shouldPauseAllDeliveries() {
+ checkClosed();
+ return manager.shouldPauseAllDeliveriesForSub(subContext);
+ }
+
+ @Override
+ public void resetTickTime(long tickTime) {
+ checkClosed();
+ manager.onTickTimeUpdated(tickTime);
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ checkClosed();
+ // For topic-level manager, clear is a no-op for individual
subscriptions
+ manager.clearForSub();
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ manager.unregister(subContext.getDispatcher());
+ }
+
+ /**
+ * Update the mark delete position for this subscription.
+ * This is called by the dispatcher when messages are acknowledged.
+ */
+ public void updateMarkDeletePosition(Position position) {
+ checkClosed();
+ manager.updateMarkDeletePosition(subContext, position);
+ }
Review Comment:
The `updateMarkDeletePosition` method is public but not part of the
`DelayedDeliveryTracker` interface. This creates an API inconsistency where
callers need to cast to the concrete type to call this method. Consider either
adding this method to the interface or finding an alternative design pattern
(such as having the dispatcher directly notify the manager) to avoid breaking
the interface abstraction.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+ // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+ private final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
delayedMessageMap =
+ new Long2ObjectRBTreeMap<>();
+
+ // Subscription registry: subscription name -> subscription context
+ private final Map<String, SubContext> subscriptionContexts = new
HashMap<>();
+
+ // Timer for delayed delivery
+ private final Timer timer;
+ private Timeout timeout;
+ private long currentTimeoutTarget = -1;
+
+ // Configuration
+ private final long tickTimeMillis;
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+ private final long fixedDelayDetectionLookahead;
+ private final Clock clock;
+
+ // Statistics
+ private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+ private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+ // Timestamp precision for memory optimization
+ private final int timestampPrecisionBitCnt;
+
+ /**
+ * Subscription context that holds per-subscription state.
+ */
+ @Getter
+ static class SubContext {
+ private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+ private final String subscriptionName;
+ private final long tickTimeMillis;
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+ private final long fixedDelayDetectionLookahead;
+ private Position markDeletePosition;
+
+ SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher,
long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict, long
fixedDelayDetectionLookahead) {
+ this.dispatcher = dispatcher;
+ this.subscriptionName = dispatcher.getSubscription().getName();
+ this.tickTimeMillis = tickTimeMillis;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ }
+
+ void updateMarkDeletePosition(Position position) {
+ this.markDeletePosition = position;
+ }
+
+ long getCutoffTime() {
+ return isDelayedDeliveryDeliverAtTimeStrict ?
System.currentTimeMillis() :
+ System.currentTimeMillis() + tickTimeMillis;
+ }
+ }
+
+ public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long
tickTimeMillis,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ long
fixedDelayDetectionLookahead) {
+ this(timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict,
+ fixedDelayDetectionLookahead);
+ }
+
+ public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long
tickTimeMillis, Clock clock,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ long
fixedDelayDetectionLookahead) {
+ this.timer = timer;
+ this.tickTimeMillis = tickTimeMillis;
+ this.clock = clock;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ this.timestampPrecisionBitCnt =
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+ }
+
+ private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+ int bitCnt = 0;
+ while (tickTimeMillis > 0) {
+ tickTimeMillis >>= 1;
+ bitCnt++;
+ }
+ return bitCnt > 0 ? bitCnt - 1 : 0;
+ }
+
+ private static long trimLowerBit(long timestamp, int bits) {
+ return timestamp & (-1L << bits);
+ }
+
+ @Override
+ public DelayedDeliveryTracker
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+ String subscriptionName = dispatcher.getSubscription().getName();
+
+ synchronized (this) {
+ SubContext subContext =
subscriptionContexts.computeIfAbsent(subscriptionName,
+ k -> new SubContext(dispatcher, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict,
+ fixedDelayDetectionLookahead));
+
+ return new InMemoryTopicDelayedDeliveryTrackerView(this,
subContext);
+ }
+ }
+
+ @Override
+ public void unregister(AbstractPersistentDispatcherMultipleConsumers
dispatcher) {
+ String subscriptionName = dispatcher.getSubscription().getName();
+
+ synchronized (this) {
+ subscriptionContexts.remove(subscriptionName);
+
+ // If no more subscriptions, close the manager
+ if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty())
{
+ close();
+ }
+ }
+ }
+
+ @Override
+ public void onTickTimeUpdated(long newTickTimeMillis) {
+ // For now, tick time updates are not supported after initialization
+ // This could be enhanced to update all subscription contexts
+ log.warn("Tick time updates are not currently supported for
topic-level delayed delivery managers");
Review Comment:
Logging a warning every time `onTickTimeUpdated` is called could spam logs
if this method is invoked frequently. Consider either implementing the
functionality, using a rate-limited logger, or throwing an
UnsupportedOperationException to clearly indicate this is not supported rather
than silently logging warnings.
```suggestion
throw new UnsupportedOperationException("Tick time updates are not
currently supported for topic-level delayed delivery managers");
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+ // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+ private final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
delayedMessageMap =
+ new Long2ObjectRBTreeMap<>();
+
+ // Subscription registry: subscription name -> subscription context
+ private final Map<String, SubContext> subscriptionContexts = new
HashMap<>();
+
+ // Timer for delayed delivery
+ private final Timer timer;
+ private Timeout timeout;
+ private long currentTimeoutTarget = -1;
+
+ // Configuration
+ private final long tickTimeMillis;
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+ private final long fixedDelayDetectionLookahead;
+ private final Clock clock;
+
+ // Statistics
+ private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+ private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+ // Timestamp precision for memory optimization
+ private final int timestampPrecisionBitCnt;
+
+ /**
+ * Subscription context that holds per-subscription state.
+ */
+ @Getter
+ static class SubContext {
+ private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+ private final String subscriptionName;
+ private final long tickTimeMillis;
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+ private final long fixedDelayDetectionLookahead;
+ private Position markDeletePosition;
+
+ SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher,
long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict, long
fixedDelayDetectionLookahead) {
+ this.dispatcher = dispatcher;
+ this.subscriptionName = dispatcher.getSubscription().getName();
+ this.tickTimeMillis = tickTimeMillis;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ }
+
+ void updateMarkDeletePosition(Position position) {
+ this.markDeletePosition = position;
+ }
+
+ long getCutoffTime() {
+ return isDelayedDeliveryDeliverAtTimeStrict ?
System.currentTimeMillis() :
+ System.currentTimeMillis() + tickTimeMillis;
+ }
+ }
+
+ public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long
tickTimeMillis,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ long
fixedDelayDetectionLookahead) {
+ this(timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict,
+ fixedDelayDetectionLookahead);
+ }
+
+ public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long
tickTimeMillis, Clock clock,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ long
fixedDelayDetectionLookahead) {
+ this.timer = timer;
+ this.tickTimeMillis = tickTimeMillis;
+ this.clock = clock;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ this.timestampPrecisionBitCnt =
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+ }
+
+ private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+ int bitCnt = 0;
+ while (tickTimeMillis > 0) {
+ tickTimeMillis >>= 1;
+ bitCnt++;
+ }
+ return bitCnt > 0 ? bitCnt - 1 : 0;
+ }
+
+ private static long trimLowerBit(long timestamp, int bits) {
+ return timestamp & (-1L << bits);
+ }
+
+ @Override
+ public DelayedDeliveryTracker
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+ String subscriptionName = dispatcher.getSubscription().getName();
+
+ synchronized (this) {
+ SubContext subContext =
subscriptionContexts.computeIfAbsent(subscriptionName,
+ k -> new SubContext(dispatcher, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict,
+ fixedDelayDetectionLookahead));
+
+ return new InMemoryTopicDelayedDeliveryTrackerView(this,
subContext);
+ }
+ }
+
+ @Override
+ public void unregister(AbstractPersistentDispatcherMultipleConsumers
dispatcher) {
+ String subscriptionName = dispatcher.getSubscription().getName();
+
+ synchronized (this) {
+ subscriptionContexts.remove(subscriptionName);
+
+ // If no more subscriptions, close the manager
+ if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty())
{
+ close();
+ }
+ }
Review Comment:
Calling `close()` within `unregister()` while already holding the lock could
lead to maintenance issues. The `close()` method also acquires the same lock
(line 180), which is reentrant but adds complexity. Consider either documenting
this self-closing behavior clearly or extracting the cleanup logic to avoid
nested locking patterns.
```suggestion
boolean shouldClose = false;
synchronized (this) {
subscriptionContexts.remove(subscriptionName);
// If no more subscriptions, close the manager
if (subscriptionContexts.isEmpty() &&
delayedMessageMap.isEmpty()) {
shouldClose = true;
}
}
if (shouldClose) {
close();
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java:
##########
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+/**
+ * In-memory implementation of topic-level delayed delivery tracker manager.
+ * This manager maintains a single global delayed message index per topic that
is shared by all
+ * subscriptions, significantly reducing memory usage in multi-subscription
scenarios.
+ */
+@Slf4j
+public class InMemoryTopicDelayedDeliveryTrackerManager implements
TopicDelayedDeliveryTrackerManager, TimerTask {
+
+ // Global delayed message index: timestamp -> ledgerId -> entryId bitmap
+ private final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
delayedMessageMap =
+ new Long2ObjectRBTreeMap<>();
+
+ // Subscription registry: subscription name -> subscription context
+ private final Map<String, SubContext> subscriptionContexts = new
HashMap<>();
+
+ // Timer for delayed delivery
+ private final Timer timer;
+ private Timeout timeout;
+ private long currentTimeoutTarget = -1;
+
+ // Configuration
+ private final long tickTimeMillis;
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+ private final long fixedDelayDetectionLookahead;
+ private final Clock clock;
+
+ // Statistics
+ private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+ private final AtomicLong bufferMemoryBytes = new AtomicLong(0);
+
+ // Timestamp precision for memory optimization
+ private final int timestampPrecisionBitCnt;
+
+ /**
+ * Subscription context that holds per-subscription state.
+ */
+ @Getter
+ static class SubContext {
+ private final AbstractPersistentDispatcherMultipleConsumers dispatcher;
+ private final String subscriptionName;
+ private final long tickTimeMillis;
+ private final boolean isDelayedDeliveryDeliverAtTimeStrict;
+ private final long fixedDelayDetectionLookahead;
+ private Position markDeletePosition;
+
+ SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher,
long tickTimeMillis,
+ boolean isDelayedDeliveryDeliverAtTimeStrict, long
fixedDelayDetectionLookahead) {
+ this.dispatcher = dispatcher;
+ this.subscriptionName = dispatcher.getSubscription().getName();
+ this.tickTimeMillis = tickTimeMillis;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ }
+
+ void updateMarkDeletePosition(Position position) {
+ this.markDeletePosition = position;
+ }
+
+ long getCutoffTime() {
+ return isDelayedDeliveryDeliverAtTimeStrict ?
System.currentTimeMillis() :
+ System.currentTimeMillis() + tickTimeMillis;
+ }
+ }
+
+ public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long
tickTimeMillis,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ long
fixedDelayDetectionLookahead) {
+ this(timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict,
+ fixedDelayDetectionLookahead);
+ }
+
+ public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long
tickTimeMillis, Clock clock,
+ boolean
isDelayedDeliveryDeliverAtTimeStrict,
+ long
fixedDelayDetectionLookahead) {
+ this.timer = timer;
+ this.tickTimeMillis = tickTimeMillis;
+ this.clock = clock;
+ this.isDelayedDeliveryDeliverAtTimeStrict =
isDelayedDeliveryDeliverAtTimeStrict;
+ this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ this.timestampPrecisionBitCnt =
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+ }
+
+ private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+ int bitCnt = 0;
+ while (tickTimeMillis > 0) {
+ tickTimeMillis >>= 1;
+ bitCnt++;
+ }
+ return bitCnt > 0 ? bitCnt - 1 : 0;
+ }
+
+ private static long trimLowerBit(long timestamp, int bits) {
+ return timestamp & (-1L << bits);
+ }
+
+ @Override
+ public DelayedDeliveryTracker
createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
+ String subscriptionName = dispatcher.getSubscription().getName();
+
+ synchronized (this) {
+ SubContext subContext =
subscriptionContexts.computeIfAbsent(subscriptionName,
+ k -> new SubContext(dispatcher, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict,
+ fixedDelayDetectionLookahead));
+
+ return new InMemoryTopicDelayedDeliveryTrackerView(this,
subContext);
Review Comment:
The `createOrGetView` method always creates a new
`InMemoryTopicDelayedDeliveryTrackerView` instance even when the `SubContext`
already exists. This means multiple view instances could be created for the
same subscription, each holding a reference to the same `SubContext`. When
multiple views are closed, the `unregister` method will remove the subscription
context prematurely, breaking other view instances. The method should track and
return existing view instances or document that only one view per subscription
should be active at a time.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java:
##########
@@ -223,11 +223,11 @@ public void
testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed()
});
Optional<DelayedDeliveryTracker> optional = reference.get();
- Assert.assertTrue(optional.get() instanceof
InMemoryDelayedDeliveryTracker);
+ Assert.assertTrue(optional.get() instanceof
InMemoryTopicDelayedDeliveryTrackerView);
// Mock DelayedDeliveryTracker and Count the number of addMessage()
calls
AtomicInteger counter = new AtomicInteger(0);
- InMemoryDelayedDeliveryTracker tracker =
(InMemoryDelayedDeliveryTracker) optional.get();
+ InMemoryTopicDelayedDeliveryTrackerView tracker =
(InMemoryTopicDelayedDeliveryTrackerView) optional.get();
tracker = Mockito.spy(tracker);
Review Comment:
Double space between `=` and `Mockito.spy(tracker)`. Should be a single
space for consistency with code formatting standards.
```suggestion
tracker = Mockito.spy(tracker);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]