lhotari commented on code in PR #24625:
URL: https://github.com/apache/pulsar/pull/24625#discussion_r2275625104


##########
pip/pip-437.md:
##########
@@ -0,0 +1,167 @@
+# PIP-437: Granular Control for Message Delivery Delays
+
+# Background knowledge
+Pulsar currently allows producers to specify deliverAt timestamps for delayed 
message delivery.
+However, there's no mechanism to constrain how far into the future a message 
can be scheduled. With each producer
+scheduling various delivery time it makes the system less predictable and 
bring challenges for support.
+
+# Motivation
+The current system lacks administrative controls on message delivery delays, 
which introduces risks to cluster stability and data integrity.
+
+## Risks in the current system:
+Unbounded Resource Pressure: Excessively delayed messages must be retained in 
storage for extended periods.
+More importantly, they create a long-lived state in the broker's memory that 
cannot be cleared,
+leading to unpredictable resource consumption and potential garbage collection 
pressure
+
+### The Limitation:
+To protect broker memory, this tracker is capped at a configurable fixed 
number of entries (e.g., 10,000).
+A large volume of messages with widely varying delivery times can easily 
exhaust this capacity.
+Once the tracker is full, it stops persisting the index for new delayed 
messages, retaining this state only in memory.
+If the broker restarts for any reason (e.g., crash, rolling upgrade), this 
volatile in-memory state is completely lost.
+
+### The Impact: Upon restart, the broker's only source of truth is the last 
persisted cursor position.
+This position does not account for the lost tracking information, forcing the 
broker to re-dispatch messages that were already processed.
+This results in significant and difficult-to-predict message duplication for 
downstream consumers.
+
+The lack of administrative controls on message delivery delays introduces 
critical risks to cluster stability and data integrity.
+This proposal aims to provide granular control at the topic and namespace 
levels to provide fixed delay delivery configuration and to add a maximum delay 
delivery restriction.
+
+# Goals
+## In Scope
+- Add a new config that limits the maximum allowed delivery delay for a topic
+- Add a fixed delivery delay config for a topic
+- Add a new config that limits the maximum allowed delivery delay for a 
namespace
+- Add a fixed delivery delay config for a namespace
+- Provide safe defaults and observability.
+
+# High Level Design
+- Add configurations `maxDelayedDeliveryTimeMs`  `fixedDelayedDeliveryTimeMs` 
for Topic-level and Namespace-level configurations
+- A three-tiered policy hierarchy (Topic > Namespace > Broker) for setting the 
maximum allowed delivery delay
+- A new, overriding policy (Topic > Namespace) for enforcing a fixed delivery 
delay
+- If both fixed-delivery-delay and max-delivery-delay policies are configured, 
the fixed-delivery-delay will always take precedence, overriding any 
client-side settings and ignoring the max-delivery-delay policy.
+- Default Behavior: If no policies are configured, the broker will honor the 
client's requested deliverAt time, constrained only by the broker-wide 
delayedDeliveryMaxDelayInMillis setting. If a policy is set at the namespace 
level but not the topic level, the topic will inherit the namespace's policy.
+
+# Detailed Design
+## Design & Implementation Details
+In the broker dispatch path (Producer.sendMessage), reject the messages that 
are not satisfied
+if a fixed value was set for future delivery, then ignore the client's 
configuration
+```
+if (messageMetadata.hasDeliverAtTime()) {
+long delay = messageMetadata.getDeliverAtTime() - System.currentTimeMillis();
+long maxDelay = getTopicMaxDeliverAtTime(topic);
+    if (delay > maxDelay) {
+        // Reject the message
+        ctx.writeAndFlush(new CommandSendError(...));
+        return;
+    }
+}
+```
+1.  **Data Model Changes**:
+    *   Add `maxDelayedDeliveryTimeInSeconds` and 
`fixedDelayedDeliveryTimeInSeconds` fields to `TopicPolicies.java` and 
`Policies.java` to store the configuration values at the topic and namespace 
levels, respectively.
+    *   Update `HierarchyTopicPolicies.java` to include the new fields in the 
effective policy resolution.
+2.  **Enforcement Logic**:
+    *   Modify the `publishMessage` method in `Producer.java` to:
+        *   Enforce the `maxDelayedDeliveryTimeInSeconds` policy by rejecting 
messages with a delivery delay exceeding the configured maximum.
+        *   Override the message's `deliverAtTime` with the 
`fixedDelayedDeliveryTimeInSeconds` if the fixed delay policy is enabled.
+3.  **Admin API**:
+    *   Add new subcommands to `CmdTopicPolicies.java` and 
`CmdNamespaces.java` to manage the new policies through the admin CLI.
+    *   Implement new REST API endpoints for setting, getting, and removing 
the policies.
+
+## Public-facing Changes
+
+### Public API
+#### Topic-Level
+| Method   | Endpoint                                                          
            | Description                                                       
                   |
+| :------- | 
:---------------------------------------------------------------------------- | 
:-----------------------------------------------------------------------------------
 |
+| `PUT`    | 
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` | 
Sets the maximum allowed delivery delay for a topic.                            
     |
+| `PUT`    | 
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime` 
| Sets the fixed delivery delay for a topic.                                    
       |
+| `GET`    | 
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` | 
Gets the maximum allowed delivery delay for a topic.                            
     |
+| `GET`    | 
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime` 
| Gets the fixed delivery delay for a topic.                                    
       |
+| `DELETE` | 
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/maxDelayedDeliveryTime` | 
Removes the maximum allowed delivery delay for a topic, reverting to namespace 
default. |
+| `DELETE` | 
`/admin/v2/topicPolicies/{tenant}/{namespace}/{topic}/fixedDelayedDeliveryTime` 
| Removes the fixed delivery delay for a topic, reverting to the namespace 
default.     |
+
+#### Namespace-Level
+| Method   | Endpoint                                                          
    | Description                                                               
               |
+| :------- | 
:-------------------------------------------------------------------- | 
:---------------------------------------------------------------------------------------
 |
+| `PUT`    | 
`/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime`    | Sets 
the maximum allowed delivery delay for a namespace.                             
    |
+| `PUT`    | 
`/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime`    | Sets 
the fixed delivery delay for a namespace.                                       
    |
+| `GET`    | 
`/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime`    | Gets 
the maximum allowed delivery delay for a namespace.                             
    |
+| `GET`    | 
`/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime`    | Gets 
the fixed delivery delay for a namespace.                                       
    |
+| `DELETE` | 
`/admin/v2/namespaces/{tenant}/{namespace}/maxDelayedDeliveryTime`    | Removes 
the maximum allowed delivery delay for a namespace, reverting to the broker 
default. |
+| `DELETE` | 
`/admin/v2/namespaces/{tenant}/{namespace}/fixedDelayedDeliveryTime`    | 
Removes the fixed delivery delay for a namespace.                               
         |
+### Binary protocol
+
+### Configuration
+
+### CLI
+New subcommands added to `pulsar-admin topics`:
+*   `set-max-delivery-delay`
+*   `get-max-delivery-delay`
+*   `remove-max-delivery-delay`
+*   `set-fixed-delivery-delay`
+*   `get-fixed-delivery-delay`
+*   `remove-fixed-delivery-delay`
+
+New subcommands added to `pulsar-admin namespace`:
+*   `set-max-delivery-delay`
+*   `get-max-delivery-delay`
+*   `remove-max-delivery-delay`
+*   `set-fixed-delivery-delay`
+*   `get-fixed-delivery-delay`
+*   `remove-fixed-delivery-delay`

Review Comment:
   There's a related "PIP-315: Configurable max delay limit for delayed 
delivery", https://github.com/apache/pulsar/blob/master/pip/pip-315.md, 
implemented by https://github.com/apache/pulsar/pull/21798 . It would be great 
that the new proposal would follow a similar solution for the API.



##########
pip/pip-437.md:
##########
@@ -0,0 +1,167 @@
+# PIP-437: Granular Control for Message Delivery Delays
+
+# Background knowledge
+Pulsar currently allows producers to specify deliverAt timestamps for delayed 
message delivery.
+However, there's no mechanism to constrain how far into the future a message 
can be scheduled. With each producer
+scheduling various delivery time it makes the system less predictable and 
bring challenges for support.
+
+# Motivation
+The current system lacks administrative controls on message delivery delays, 
which introduces risks to cluster stability and data integrity.
+
+## Risks in the current system:
+Unbounded Resource Pressure: Excessively delayed messages must be retained in 
storage for extended periods.
+More importantly, they create a long-lived state in the broker's memory that 
cannot be cleared,
+leading to unpredictable resource consumption and potential garbage collection 
pressure
+
+### The Limitation:
+To protect broker memory, this tracker is capped at a configurable fixed 
number of entries (e.g., 10,000).
+A large volume of messages with widely varying delivery times can easily 
exhaust this capacity.
+Once the tracker is full, it stops persisting the index for new delayed 
messages, retaining this state only in memory.
+If the broker restarts for any reason (e.g., crash, rolling upgrade), this 
volatile in-memory state is completely lost.
+
+### The Impact: Upon restart, the broker's only source of truth is the last 
persisted cursor position.
+This position does not account for the lost tracking information, forcing the 
broker to re-dispatch messages that were already processed.
+This results in significant and difficult-to-predict message duplication for 
downstream consumers.
+
+The lack of administrative controls on message delivery delays introduces 
critical risks to cluster stability and data integrity.
+This proposal aims to provide granular control at the topic and namespace 
levels to provide fixed delay delivery configuration and to add a maximum delay 
delivery restriction.
+
+# Goals
+## In Scope
+- Add a new config that limits the maximum allowed delivery delay for a topic
+- Add a fixed delivery delay config for a topic
+- Add a new config that limits the maximum allowed delivery delay for a 
namespace

Review Comment:
   I think that the maximum allowed delivery delay is already covered by 
["PIP-315: Configurable max delay limit for delayed 
delivery"](https://github.com/apache/pulsar/blob/master/pip/pip-315.md)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to