lhotari commented on code in PR #24625: URL: https://github.com/apache/pulsar/pull/24625#discussion_r2275625104
########## pip/pip-437.md: ########## @@ -0,0 +1,167 @@ +# PIP-437: Granular Control for Message Delivery Delays + +# Background knowledge +Pulsar currently allows producers to specify deliverAt timestamps for delayed message delivery. +However, there's no mechanism to constrain how far into the future a message can be scheduled. With each producer +scheduling various delivery time it makes the system less predictable and bring challenges for support. + +# Motivation +The current system lacks administrative controls on message delivery delays, which introduces risks to cluster stability and data integrity. + +## Risks in the current system: +Unbounded Resource Pressure: Excessively delayed messages must be retained in storage for extended periods. +More importantly, they create a long-lived state in the broker's memory that cannot be cleared, +leading to unpredictable resource consumption and potential garbage collection pressure + +### The Limitation: +To protect broker memory, this tracker is capped at a configurable fixed number of entries (e.g., 10,000). +A large volume of messages with widely varying delivery times can easily exhaust this capacity. +Once the tracker is full, it stops persisting the index for new delayed messages, retaining this state only in memory. +If the broker restarts for any reason (e.g., crash, rolling upgrade), this volatile in-memory state is completely lost. + +### The Impact: Upon restart, the broker's only source of truth is the last persisted cursor position. +This position does not account for the lost tracking information, forcing the broker to re-dispatch messages that were already processed. +This results in significant and difficult-to-predict message duplication for downstream consumers. + +The lack of administrative controls on message delivery delays introduces critical risks to cluster stability and data integrity. +This proposal aims to provide granular control at the topic and namespace levels to provide fixed delay delivery configuration and to add a maximum delay delivery restriction. + +# Goals +## In Scope +- Add a new config that limits the maximum allowed delivery delay for a topic +- Add a fixed delivery delay config for a topic +- Add a new config that limits the maximum allowed delivery delay for a namespace +- Add a fixed delivery delay config for a namespace +- Provide safe defaults and observability. + +# High Level Design +- Add configurations `maxDelayedDeliveryTimeMs` `fixedDelayedDeliveryTimeMs` for Topic-level and Namespace-level configurations +- A three-tiered policy hierarchy (Topic > Namespace > Broker) for setting the maximum allowed delivery delay +- A new, overriding policy (Topic > Namespace) for enforcing a fixed delivery delay +- If both fixed-delivery-delay and max-delivery-delay policies are configured, the fixed-delivery-delay will always take precedence, overriding any client-side settings and ignoring the max-delivery-delay policy. +- Default Behavior: If no policies are configured, the broker will honor the client's requested deliverAt time, constrained only by the broker-wide delayedDeliveryMaxDelayInMillis setting. If a policy is set at the namespace level but not the topic level, the topic will inherit the namespace's policy. + +# Detailed Design +## Design & Implementation Details +In the broker dispatch path (Producer.sendMessage), reject the messages that are not satisfied +if a fixed value was set for future delivery, then ignore the client's configuration +``` +if (messageMetadata.hasDeliverAtTime()) { +long delay = messageMetadata.getDeliverAtTime() - System.currentTimeMillis(); +long maxDelay = getTopicMaxDeliverAtTime(topic); + if (delay > maxDelay) { + // Reject the message + ctx.writeAndFlush(new CommandSendError(...)); + return; + } +} +``` +1. **Data Model Changes**: + * Add `maxDelayedDeliveryTimeInSeconds` and `fixedDelayedDeliveryTimeInSeconds` fields to `TopicPolicies.java` and `Policies.java` to store the configuration values at the topic and namespace levels, respectively. + * Update `HierarchyTopicPolicies.java` to include the new fields in the effective policy resolution. +2. **Enforcement Logic**: + * Modify the `publishMessage` method in `Producer.java` to: + * Enforce the `maxDelayedDeliveryTimeInSeconds` policy by rejecting messages with a delivery delay exceeding the configured maximum. + * Override the message's `deliverAtTime` with the `fixedDelayedDeliveryTimeInSeconds` if the fixed delay policy is enabled. +3. **Admin API**: + * Add new subcommands to `CmdTopicPolicies.java` and `CmdNamespaces.java` to manage the new policies through the admin CLI. + * Implement new REST API endpoints for setting, getting, and removing the policies. + +## Public-facing Changes + +### Public API +#### Topic-Level +| Method | Endpoint | Description | +| :------- | :---------------------------------------------------------------------------- | :----------------------------------------------------------------------------------- | +| `PUT` | `/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` | Sets the maximum allowed delivery delay for a topic. | +| `PUT` | `/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime` | Sets the fixed delivery delay for a topic. | +| `GET` | `/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` | Gets the maximum allowed delivery delay for a topic. | +| `GET` | `/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime` | Gets the fixed delivery delay for a topic. | +| `DELETE` | `/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` | Removes the maximum allowed delivery delay for a topic, reverting to namespace default. | +| `DELETE` | `/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime` | Removes the fixed delivery delay for a topic, reverting to the namespace default. | + +#### Namespace-Level +| Method | Endpoint | Description | +| :------- | :-------------------------------------------------------------------- | :--------------------------------------------------------------------------------------- | +| `PUT` | `/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime` | Sets the maximum allowed delivery delay for a namespace. | +| `PUT` | `/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime` | Sets the fixed delivery delay for a namespace. | +| `GET` | `/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime` | Gets the maximum allowed delivery delay for a namespace. | +| `GET` | `/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime` | Gets the fixed delivery delay for a namespace. | +| `DELETE` | `/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime` | Removes the maximum allowed delivery delay for a namespace, reverting to the broker default. | +| `DELETE` | `/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime` | Removes the fixed delivery delay for a namespace. | +### Binary protocol + +### Configuration + +### CLI +New subcommands added to `pulsar-admin topics`: +* `set-max-delivery-delay` +* `get-max-delivery-delay` +* `remove-max-delivery-delay` +* `set-fixed-delivery-delay` +* `get-fixed-delivery-delay` +* `remove-fixed-delivery-delay` + +New subcommands added to `pulsar-admin namespace`: +* `set-max-delivery-delay` +* `get-max-delivery-delay` +* `remove-max-delivery-delay` +* `set-fixed-delivery-delay` +* `get-fixed-delivery-delay` +* `remove-fixed-delivery-delay` Review Comment: There's a related "PIP-315: Configurable max delay limit for delayed delivery", https://github.com/apache/pulsar/blob/master/pip/pip-315.md, implemented by https://github.com/apache/pulsar/pull/21798 . It would be great that the new proposal would follow a similar solution for the API. ########## pip/pip-437.md: ########## @@ -0,0 +1,167 @@ +# PIP-437: Granular Control for Message Delivery Delays + +# Background knowledge +Pulsar currently allows producers to specify deliverAt timestamps for delayed message delivery. +However, there's no mechanism to constrain how far into the future a message can be scheduled. With each producer +scheduling various delivery time it makes the system less predictable and bring challenges for support. + +# Motivation +The current system lacks administrative controls on message delivery delays, which introduces risks to cluster stability and data integrity. + +## Risks in the current system: +Unbounded Resource Pressure: Excessively delayed messages must be retained in storage for extended periods. +More importantly, they create a long-lived state in the broker's memory that cannot be cleared, +leading to unpredictable resource consumption and potential garbage collection pressure + +### The Limitation: +To protect broker memory, this tracker is capped at a configurable fixed number of entries (e.g., 10,000). +A large volume of messages with widely varying delivery times can easily exhaust this capacity. +Once the tracker is full, it stops persisting the index for new delayed messages, retaining this state only in memory. +If the broker restarts for any reason (e.g., crash, rolling upgrade), this volatile in-memory state is completely lost. + +### The Impact: Upon restart, the broker's only source of truth is the last persisted cursor position. +This position does not account for the lost tracking information, forcing the broker to re-dispatch messages that were already processed. +This results in significant and difficult-to-predict message duplication for downstream consumers. + +The lack of administrative controls on message delivery delays introduces critical risks to cluster stability and data integrity. +This proposal aims to provide granular control at the topic and namespace levels to provide fixed delay delivery configuration and to add a maximum delay delivery restriction. + +# Goals +## In Scope +- Add a new config that limits the maximum allowed delivery delay for a topic +- Add a fixed delivery delay config for a topic +- Add a new config that limits the maximum allowed delivery delay for a namespace Review Comment: I think that the maximum allowed delivery delay is already covered by ["PIP-315: Configurable max delay limit for delayed delivery"](https://github.com/apache/pulsar/blob/master/pip/pip-315.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org