tjiuming opened a new issue, #19448:
URL: https://github.com/apache/pulsar/issues/19448

   ### Motivation
   
   Currently, in Pulsar, we have a configuration named 
`subscriptionExpirationTimeMinutes` to control the subscription expiration of 
`PersistentTopic` and `NonPersistentTopic`.
   
   When we set a value which is greater than 0 to 
`subscriptionExpirationTimeMinutes`, it will affect both `PersistentTopic` and 
`NonPersistentTopic`. Their inactive subscriptions will get expired and will 
clean automatically.
   
   For `NonPersistentTopic`, its subscriptions can be clean because we don't 
guarantee its data integrity. But for `PersistentTopic`, if we clean its 
subscriptions automatically may lead to data loss.
   
   However, their subscription expiration is managed by the same 
configuration(`subscriptionExpirationTimeMinutes`), we can't control their 
subscription expiration independently.
   
   So I want to introduce a new configuration named 
`nonPersistentSubscriptionExpirationTimeMinutes` to manage 
`NonPersistentTopic`'s subscription expiration.
   
   ### Goal
   
   Provide a new configuration to manage the `NonPersistentTopic`'s 
subscription expiration independently.
   
   ### API Changes
   
   Add a new field which named 
`non_persistent_subscription_expiration_time_minutes` into 
[`Policies`](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java).
   It will change the Request/Response of the following APIs:
   1. [Namespaces#getPolicies(String 
namespace)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L277)
   2. [Namespaces#getPoliciesAsync(String 
namespace)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L309)
   3. [Namespaces#createNamespace(String namespace, Policies 
policies)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L462)
   4. [Namespaces#createNamespaceAsync(String namespace, Policies 
policies)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L474)
   
   Add the following rest-APIs to manage the value of 
`non_persistent_subscription_expiration_time_minutes`.
   1. getNonPersistentSubscriptionExpirationTime
   ```
       @GET
       @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime")
       @ApiOperation(value = "Get the non-persistent subscription expiration 
time for the namespace")
       @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or 
namespace doesn't exist") })
       public void getNonPersistentSubscriptionExpirationTime(@Suspended 
AsyncResponse asyncResponse,
                                                    @PathParam("tenant") String 
tenant,
                                                    @PathParam("namespace") 
String namespace) {
           // ignore...
       }
   ```
   2. setNonPersistentSubscriptionExpirationTime
   ```
       @POST
       @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime")
       @ApiOperation(value = "Set non-persistent subscription expiration time 
in minutes for namespace")
       @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or 
namespace doesn't exist"),
               @ApiResponse(code = 412, message = "Invalid expiration time")})
       public void setNonPersistentSubscriptionExpirationTime(@Suspended 
AsyncResponse asyncResponse,
                                                 @PathParam("tenant") String 
tenant,
                                                 @PathParam("namespace") String 
namespace,
                                                 @ApiParam(value =
                                                         "Expiration time in 
minutes for the specified namespace",
                                                         required = true) int 
expirationTime) {
           // ignore...
       }
   ```
   3. removeNonPersistentSubscriptionExpirationTime
   ```
       @DELETE
       @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime")
       @ApiOperation(value = "Remove non-persistent subscription expiration 
time for namespace")
       @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or 
namespace doesn't exist")})
       public void removeNonPersistentSubscriptionExpirationTime(@Suspended 
AsyncResponse asyncResponse,
                                                    @PathParam("tenant") String 
tenant,
                                                    @PathParam("namespace") 
String namespace) {
           // ignore...
       }
   ```
   
   ### Implementation
   
   Modify 
[`NonPersistentTopic#checkInactiveSubscriptions`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1022)
 as follows:
   ```
           TopicName name = TopicName.get(topic);
           try {
               Policies policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
                       .getPolicies(name.getNamespaceObject())
                       
.orElseThrow(MetadataStoreException.NotFoundException::new);
               final int defaultExpirationTime = 
brokerService.pulsar().getConfiguration()
                       .getNonPersistentSubscriptionExpirationTimeMinutes();
               final Integer nsExpirationTime = 
policies.non_persistent_subscription_expiration_time_minutes;
               final long expirationTimeMillis = TimeUnit.MINUTES
                       .toMillis(nsExpirationTime == null ? 
defaultExpirationTime : nsExpirationTime);
               if (expirationTimeMillis > 0) {
                   subscriptions.forEach((subName, sub) -> {
                       if (sub.getDispatcher() != null
                               && sub.getDispatcher().isConsumerConnected() || 
sub.isReplicated()) {
                           return;
                       }
                       if (System.currentTimeMillis() - sub.getLastActive() > 
expirationTimeMillis) {
                           sub.delete().thenAccept(v -> log.info("[{}][{}] The 
subscription was deleted due to expiration "
                                   + "with last active [{}]", topic, subName, 
sub.getLastActive()));
                       }
                   });
               }
           } catch (Exception e) {
               if (log.isDebugEnabled()) {
                   log.debug("[{}] Error getting policies", topic);
               }
           }
   ```
   
   This is no longer compatible with the previous implementation.
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to