coderzc commented on code in PR #24928:
URL: https://github.com/apache/pulsar/pull/24928#discussion_r2596926442
##########
pip/pip-448.md:
##########
@@ -0,0 +1,166 @@
+# PIP-448: Topic-level Delayed Message Tracker for Memory Optimization
+
+# Background knowledge
+
+In Apache Pulsar, **Delayed Message Delivery** allows producers to specify a
delay for a message, ensuring it is not delivered to any consumer until the
specified time has passed. This is a useful feature for implementing tasks like
scheduled reminders or retry mechanisms with backoff.
+
+The legacy default mechanism for handling delayed messages is the
`InMemoryDelayedDeliveryTracker`. This tracker is instantiated on a
*per-subscription* basis within the broker. When a topic has multiple
subscriptions, each subscription gets its own independent
`InMemoryDelayedDeliveryTracker` instance.
+
+The consequence of this per-subscription design is that if a delayed message
is published to a topic with 'N' subscriptions, that message's metadata (its
position) is stored 'N' times in the broker's memory. This leads to significant
memory overhead, especially for topics with a large number of subscriptions, as
the memory usage scales linearly with the number of subscriptions.
+
+# Motivation
+
+The primary motivation for this proposal is to address the high memory
consumption caused by the legacy per-subscription delayed message tracking
mechanism. For topics with hundreds or thousands of subscriptions, the memory
footprint for delayed messages can become prohibitively large. Each delayed
message's position is duplicated across every subscription's tracker, leading
to a memory usage pattern of `O(num_delayed_messages * num_subscriptions)`.
+
+This excessive memory usage can cause:
+* Increased memory pressure on Pulsar brokers.
+* More frequent and longer Garbage Collection (GC) pauses, impacting broker
performance.
+* Potential OutOfMemoryErrors, leading to broker instability.
+* Limited scalability for use cases that rely on many subscriptions per
topic, such as IoT or large-scale microservices with shared subscriptions.
+
+By introducing an alternative, topic-level tracking mechanism, we can provide
a memory-efficient solution to enhance broker stability and scalability for
these critical use cases.
+
+# Goals
+
+## In Scope
+* Introduce a new, optional, topic-level delayed message tracker that is
shared across all subscriptions of a single topic. This will store each delayed
message's position only once.
+* Significantly reduce the memory footprint for delayed message handling
when this new tracker is enabled, changing the memory complexity from
`O(num_delayed_messages * num_subscriptions)` to `O(num_delayed_messages)`.
+* Provide new configuration options to allow operators to tune the behavior
of the new tracker, such as pruning intervals and cleanup delays.
+* Maintain the existing `DelayedDeliveryTracker` interface to ensure
seamless integration with the dispatcher logic.
+* Preserve the existing per-subscription
`InMemoryDelayedDeliveryTrackerFactory` as the default for backward
compatibility, requiring operators to opt-in to use the new topic-level tracker.
+
+## Out of Scope
+* This proposal does not modify the persistent, bucket-based delayed
delivery tracker (`BucketDelayedDeliveryTracker`).
+* No changes will be made to the public-facing client APIs, REST APIs, or
the wire protocol. This is a broker-internal optimization.
+* The semantic behavior of delayed messages from a user's perspective will
remain identical.
+
+# High Level Design
+
+The core idea is to introduce a new, opt-in `DelayedDeliveryTrackerFactory`
that implements a shared, topic-level tracking strategy. This is achieved with
two new components: a `TopicDelayedDeliveryTrackerManager` and a
subscription-scoped `InMemoryTopicDelayedDeliveryTracker`.
+
+1. **New Factory (`InMemoryTopicDelayedDeliveryTrackerFactory`)**: A new
factory class is introduced. To enable the feature, operators must set
`delayedDeliveryTrackerFactoryClassName` to
`org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory`.
This factory manages the lifecycle of topic-level managers.
+
+2. **Shared Topic-Level Manager
(`InMemoryTopicDelayedDeliveryTrackerManager`)**: For each topic, a single
instance of this manager is created by the new factory. This manager owns a
global index of all delayed messages for that topic, storing each message's
position only once.
+
+3. **Per-Subscription Tracker (`InMemoryTopicDelayedDeliveryTracker`)**: The
dispatcher for each subscription receives an instance of this class. It
implements the standard `DelayedDeliveryTracker` interface but acts as a
lightweight proxy to the shared `TopicDelayedDeliveryTrackerManager`. It
maintains per-subscription state (like the `markDeletePosition`) while
delegating core operations to the shared manager.
+
+4. **Lifecycle and Caching**: The new factory maintains a cache of managers
keyed by topic name. When a tracker is requested:
+ * It gets or creates a manager for the topic.
+ * The manager then creates a new `InMemoryTopicDelayedDeliveryTracker`
for the subscription.
+ * When a subscription closes, it unregisters from the manager. When the
last subscription unregisters, the manager's cleanup is scheduled based on the
`delayedDeliveryTopicManagerIdleMillis` configuration, allowing it to be reused
if a new subscription appears quickly.
+
+This architectural change can be described as follows:
+
+* **Default/Legacy Behavior:** For a single topic, each subscription (e.g.,
Sub1, Sub2) maintains its own complete `InMemoryDelayedDeliveryTracker`.
Message metadata is duplicated in each tracker.
+
+* **New Topic-Level Behavior:** For a single topic, there is only one
central `InMemoryTopicDelayedDeliveryTrackerManager` holding a shared index.
Each subscription (Sub1, Sub2) receives a lightweight
`InMemoryTopicDelayedDeliveryTracker` that acts as a view/proxy, pointing to
the single, shared manager, thus eliminating data duplication.
+
+The manager handles pruning of acknowledged messages from the shared index by
tracking the `markDeletePosition` of all active subscriptions and only removing
messages that have been acknowledged by *all* of them.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### `InMemoryTopicDelayedDeliveryTrackerFactory` (New Class)
+This is the new factory that must be explicitly configured to enable the
topic-level tracking feature.
+* **Role**: Manages the lifecycle of
`InMemoryTopicDelayedDeliveryTrackerManager` instances.
+* **Cache**: It maintains a `ConcurrentMap<String,
TopicDelayedDeliveryTrackerManager>` to cache managers per topic.
+* **Lifecycle Management**: When the last subscription for a topic is
closed, the factory uses the `delayedDeliveryTopicManagerIdleMillis` setting to
determine when to clean up the manager. A value of `0` removes it immediately,
while a positive value schedules a delayed removal, preventing churn if
subscriptions are volatile.
+* **Configuration**: It reads and passes the new tuning parameters
(`pruneMinIntervalMillis`, `pruneEligibleRatio`) to the manager instances it
creates.
+
+### `TopicDelayedDeliveryTrackerManager` (New Interface)
+This interface defines the contract for a topic-level manager.
+```java
+public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable {
+ DelayedDeliveryTracker
createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher);
+ void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher);
+ // ... other methods
+}
+```
+
+### `InMemoryTopicDelayedDeliveryTrackerManager` (New Class)
+This is the implementation of the topic-level manager.
+* **Data Structure**: Uses a `ConcurrentSkipListMap<Long,
Long2ObjectRBTreeMap<Roaring64Bitmap>>` as its core index, mapping delivery
timestamps to message positions efficiently.
+* **Subscription Context (`SubContext`)**: Holds per-subscription state,
including the `markDeletePosition`.
+* **Pruning Logic**: Pruning is throttled and tunable:
+ * The minimum time between pruning attempts is controlled by
`delayedDeliveryPruneMinIntervalMillis`. If not set, it uses an adaptive
interval based on the tick time.
+ * Opportunistic pruning is triggered during the delivery check if the
ratio of subscriptions ready for delivery exceeds
`delayedDeliveryPruneEligibleRatio`.
+* **`createOrGetTracker`**: This method creates the per-subscription
`InMemoryTopicDelayedDeliveryTracker` instance.
+
+### `InMemoryTopicDelayedDeliveryTracker` (New Class)
+This class implements the `DelayedDeliveryTracker` interface for a single
subscription.
+* **Role**: Acts as a lightweight proxy, forwarding all operations (e.g.,
`addMessage`, `getScheduledMessages`) to the shared
`InMemoryTopicDelayedDeliveryTrackerManager`.
Review Comment:
Whether it's possible to consider using a special cursor to replay delayed
messages to build shared delayed message tracker, instead of relying on the
subscription-level DelayedDeliveryTracker to forward messages?
We can check and poll the scheduled message IDs from the shared delayed
message tracker to add to the replay queue (MessageRedeliveryController) of all
subscriptions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]