tjiuming opened a new issue, #19448: URL: https://github.com/apache/pulsar/issues/19448
### Motivation Currently, in Pulsar, we have a configuration named `subscriptionExpirationTimeMinutes` to control the subscription expiration of `PersistentTopic` and `NonPersistentTopic`. When we set a value which is greater than 0 to `subscriptionExpirationTimeMinutes`, it will affect both `PersistentTopic` and `NonPersistentTopic`. Their inactive subscriptions will get expired and will clean automatically. For `NonPersistentTopic`, its subscriptions can be clean because we don't guarantee its data integrity. But for `PersistentTopic`, if we clean its subscriptions automatically may lead to data loss. However, their subscription expiration is managed by the same configuration(`subscriptionExpirationTimeMinutes`), we can't control their subscription expiration independently. So I want to introduce a new configuration named `nonPersistentSubscriptionExpirationTimeMinutes` to manage `NonPersistentTopic`'s subscription expiration. ### Goal Provide a new configuration to manage the `NonPersistentTopic`'s subscription expiration independently. ### API Changes Add a new field which named `non_persistent_subscription_expiration_time_minutes` into [`Policies`](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java). It will change the Request/Response of the following APIs: 1. [Namespaces#getPolicies(String namespace)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L277) 2. [Namespaces#getPoliciesAsync(String namespace)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L309) 3. [Namespaces#createNamespace(String namespace, Policies policies)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L462) 4. [Namespaces#createNamespaceAsync(String namespace, Policies policies)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L474) Add the following rest-APIs to manage the value of `non_persistent_subscription_expiration_time_minutes`. 1. getNonPersistentSubscriptionExpirationTime ``` @GET @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime") @ApiOperation(value = "Get the non-persistent subscription expiration time for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) public void getNonPersistentSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { // ignore... } ``` 2. setNonPersistentSubscriptionExpirationTime ``` @POST @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime") @ApiOperation(value = "Set non-persistent subscription expiration time in minutes for namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid expiration time")}) public void setNonPersistentSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Expiration time in minutes for the specified namespace", required = true) int expirationTime) { // ignore... } ``` 3. removeNonPersistentSubscriptionExpirationTime ``` @DELETE @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime") @ApiOperation(value = "Remove non-persistent subscription expiration time for namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) public void removeNonPersistentSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { // ignore... } ``` ### Implementation Modify [`NonPersistentTopic#checkInactiveSubscriptions`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1022) as follows: ``` TopicName name = TopicName.get(topic); try { Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPolicies(name.getNamespaceObject()) .orElseThrow(MetadataStoreException.NotFoundException::new); final int defaultExpirationTime = brokerService.pulsar().getConfiguration() .getNonPersistentSubscriptionExpirationTimeMinutes(); final Integer nsExpirationTime = policies.non_persistent_subscription_expiration_time_minutes; final long expirationTimeMillis = TimeUnit.MINUTES .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { subscriptions.forEach((subName, sub) -> { if (sub.getDispatcher() != null && sub.getDispatcher().isConsumerConnected() || sub.isReplicated()) { return; } if (System.currentTimeMillis() - sub.getLastActive() > expirationTimeMillis) { sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration " + "with last active [{}]", topic, subName, sub.getLastActive())); } }); } } catch (Exception e) { if (log.isDebugEnabled()) { log.debug("[{}] Error getting policies", topic); } } ``` This is no longer compatible with the previous implementation. ### Alternatives _No response_ ### Anything else? _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
