This is an automated email from the ASF dual-hosted git repository. thetumbled pushed a commit to branch DocForRetryLetterProducerConfig in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
commit b10ac8efc267e2378fa14311b11c28f24b4329b8 Author: thetumbled <843221...@qq.com> AuthorDate: Tue Mar 18 17:41:50 2025 +0800 add doc. --- docs/concepts-messaging.md | 85 +++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/docs/concepts-messaging.md b/docs/concepts-messaging.md index bd80f146a74..0bfde776269 100644 --- a/docs/concepts-messaging.md +++ b/docs/concepts-messaging.md @@ -63,11 +63,11 @@ For batch messages, you can enable batch index acknowledgment to avoid dispatchi Messages can be acknowledged in one of the following two ways: - Being acknowledged individually - + With individual acknowledgment, the consumer acknowledges each message and sends an acknowledgment request to the broker. - Being acknowledged cumulatively - + With cumulative acknowledgment, the consumer **only** acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer. If you want to acknowledge messages individually, you can use the following API. @@ -257,9 +257,15 @@ The default retry letter topic uses this format: ::: -Use the Java client to specify the name of the retry letter topic. +Use the Java client to specify the name of the retry letter topic and configure the producer of the retry letter topic. ```java +// enable batch and disable chunking for the retry letter topic producer +// by default, the batch feature is disabled and the chunking feature is enabled +DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> { + producerBuilder.enableBatching(true); + producerBuilder.enableChunking(false); +}; Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .topic("my-topic") .subscriptionName("my-subscription") @@ -268,6 +274,7 @@ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) .retryLetterTopic("my-retry-letter-topic-name") + .retryLetterProducerBuilderCustomizer(producerBuilderCustomizer) .build()) .subscribe(); ``` @@ -307,7 +314,7 @@ consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS); :::note -* Currently, retry letter topic is enabled in Shared subscription types. +* Currently, retry letter topic is enabled in Shared subscription types, because it relies on the delay message feature, which is only available in Shared subscription types. * Compared with negative acknowledgment, retry letter topic is more suitable for messages that require a large number of retries with a configurable retry interval. Because messages in the retry letter topic are persisted to BookKeeper, while messages that need to be retried due to negative acknowledgment are cached on the client side. ::: @@ -347,9 +354,15 @@ The dead letter producerName uses this format: - From Pulsar 2.3.x to 2.10.x, Java SDK dead letter policy will set a 30 seconds acknowledgment timeout when there is no user defined acknowledgment timeout. This default timeout policy has been removed since 3.0.x. ::: -Use the Java client to specify the name of the dead letter topic. +Use the Java client to specify the name of the dead letter topic and configure the producer of the retry/dead letter topic. ```java +// enable batch and disable chunking for the dead letter topic producer +// by default, the batch feature is disabled and the chunking feature is enabled +DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> { + producerBuilder.enableBatching(true); + producerBuilder.enableChunking(false); +}; Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .topic("my-topic") .subscriptionName("my-subscription") @@ -357,6 +370,8 @@ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) .deadLetterTopic("my-dead-letter-topic-name") + .deadLetterProducerBuilderCustomizer(producerBuilderCustomizer) + .retryLetterProducerBuilderCustomizer(producerBuilderCustomizer) .build()) .subscribe(); ``` @@ -516,7 +531,7 @@ A Pulsar subscription is a named configuration rule that determines how messages - [exclusive](#exclusive) - [shared](#shared) - [failover](#failover) -- [key_shared](#key_shared) +- [key_shared](#key_shared) These types are illustrated in the figure below. @@ -552,9 +567,9 @@ Exclusive is the default subscription type. #### Failover -The failover type is a subscription type that multiple consumers can attach to the same subscription. +The failover type is a subscription type that multiple consumers can attach to the same subscription. -A master consumer is picked for a non-partitioned topic or each partition of a partitioned topic and receives messages. +A master consumer is picked for a non-partitioned topic or each partition of a partitioned topic and receives messages. When the master consumer disconnects, all (non-acknowledged and subsequent) messages are delivered to the next consumer in line. @@ -566,60 +581,60 @@ In some cases, a partition may have an older active consumer processing messages ##### Failover | Partitioned topics -For partitioned topics, the broker sorts consumers by priority and lexicographical order of consumer name. +For partitioned topics, the broker sorts consumers by priority and lexicographical order of consumer name. -The broker tries to evenly assign partitions to consumers with the highest priority. +The broker tries to evenly assign partitions to consumers with the highest priority. A consumer is selected by running a module operation `mod (partition index, consumer index)`. - If the number of partitions in a partitioned topic is **less** than the number of consumers: - - For example, in the diagram below, this partitioned topic has 2 partitions and there are 4 consumers. - - Each partition has 1 active consumer and 3 stand-by consumers. - + + For example, in the diagram below, this partitioned topic has 2 partitions and there are 4 consumers. + + Each partition has 1 active consumer and 3 stand-by consumers. + - For P0, Consumer A is the master consumer, while Consumer B, Consumer C, and Consumer D would be the next consumer in line to receive messages if consumer A is disconnected. - For P1, Consumer B is the master consumer, while Consumer A, Consumer C, and Consumer D would be the next consumer in line to receive messages if consumer B is disconnected. - - Moreover, if Consumer A and consumer B are disconnected, then - + - Moreover, if Consumer A and consumer B are disconnected, then + - for P0: Consumer C is the active consumer and Consumer D is the stand-by consumer. - + - for P1: Consumer D is the active consumer and Consumer C is the stand-by consumer.  - If the number of partitions in a partitioned topic is **greater** than the number of consumers: - - For example, in the diagram below, this partitioned topic has 9 partitions and 3 consumers. - + + For example, in the diagram below, this partitioned topic has 9 partitions and 3 consumers. + - P0, P3, and P6 are assigned to Consumer A. Consumer A is their active consumer. Consumer B and Consumer C are their stand-by consumers. - + - P1, P4, and P7 are assigned to Consumer B. Consumer B is their active consumer. Consumer A and Consumer C are their stand-by consumers. - + - P2, P5, and P8 are assigned to Consumer C. Consumer C is their active consumer. Consumer A and Consumer B are their stand-by consumers. - +  ##### Failover | Non-partitioned topics -- If there is one non-partitioned topic. The broker picks consumers in the order they subscribe to non-partitioned topics. +- If there is one non-partitioned topic. The broker picks consumers in the order they subscribe to non-partitioned topics. + + For example, in the diagram below, this non-partitioned topic has 1 topic and there are 2 consumers. + + The topic has 1 active consumer and 1 stand-by consumer. - For example, in the diagram below, this non-partitioned topic has 1 topic and there are 2 consumers. - - The topic has 1 active consumer and 1 stand-by consumer. - Consumer A is the master consumer, while consumer B would be the next consumer in line to receive messages if consumer A is disconnected.  - If there are multiple non-partitioned topics, a consumer is selected based on **consumer name hash** and **topic name hash**. The client uses the same consumer name to subscribe to all the topics. - For example, in the diagram below, there are 4 non-partitioned topics and 2 consumers. - + For example, in the diagram below, there are 4 non-partitioned topics and 2 consumers. + - The non-partitioned topic 1 and non-partitioned topic 4 are assigned to consumer B. Consumer A is their stand-by consumer. - + - The non-partitioned topic 2 and non-partitioned topic 3 are assigned to consumer A. Consumer B is their stand-by consumer.  @@ -648,7 +663,7 @@ The Key_Shared subscription type in Pulsar allows multiple consumers to attach t If there is a newly switched over active consumer, it will start reading messages from the position where messages are acked by the old inactive consumer. -For example, if P0 is assigned to Consumer A. Consumer A is the active consumer and Consumer B is the stand-by consumer. +For example, if P0 is assigned to Consumer A. Consumer A is the active consumer and Consumer B is the stand-by consumer. - If Consumer A gets disconnected without reading any messages from P0, when Consumer C is added and becomes the new active consumer, then Consumer C will start reading messages directly from P0. @@ -859,7 +874,7 @@ The field `keyHashRanges` contained the information as a list of string values, Example of the consumer stats part of the topic stats for a subscription: ```json -{ +{ "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, @@ -1194,7 +1209,7 @@ If there is a key attached to message, the messages will be routed to correspond [Hashing Scheme](/api/client/org/apache/pulsar/client/api/HashingScheme) is an enum that represents sets of standard hashing functions available when choosing the partition to use for a particular message. -There are 2 types of standard hashing functions available: +There are 2 types of standard hashing functions available: - JavaStringHash - Murmur3_32Hash