liudezhi2098 opened a new pull request #6449: Support Consumers Set Custom Retry Delay URL: https://github.com/apache/pulsar/pull/6449 <!-- ### Contribution Checklist - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number. Skip *Issue XYZ* if there is no associated github issue for this pull request. Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** --> Master Issue: #<6448> ### Motivation For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement. ### Modifications This change can be supported on the client side, need to add a set of interfaces to org.apache.pulsar.client.api.Consumer ```java void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException; CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit); CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel); ``` DeadLetterPolicy add retry topic ```java public class DeadLetterPolicy { /** * Maximum number of times that a message will be redelivered before being sent to the dead letter queue. */ private int maxRedeliverCount; /** * Name of the retry topic where the failing messages will be sent. */ private String retryLetterTopic; /** * Name of the dead topic where the failing messages will be sent. */ private String deadLetterTopic; } ``` org.apache.pulsar.client.impl.ConsumerImpl add a retry producer ```java private volatile Producer<T> deadLetterProducer; private volatile Producer<T> retryLetterProducer; ``` Can specify whether to enable retry when creating a consumer,default unenable ```java @Override public ConsumerBuilder<T> enableRetry(boolean retryEnable) { conf.setRetryEnable(retryEnable); return this; } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
