This is an automated email from the ASF dual-hosted git repository.
liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 52c382a7e1e [improve][doc] Added docs for key shared subscription
hashing schemes (#18878)
52c382a7e1e is described below
commit 52c382a7e1e6f44ec52ba3280d84d67bb4f70e1d
Author: Asaf Mesika <[email protected]>
AuthorDate: Wed Dec 14 09:07:43 2022 +0200
[improve][doc] Added docs for key shared subscription hashing schemes
(#18878)
---
site2/docs/concepts-messaging.md | 185 +++++++++++++++++++++++++++++++++++----
1 file changed, 166 insertions(+), 19 deletions(-)
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 92db99e33ca..24782cac3d0 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -22,18 +22,18 @@ If the consumption of a message fails and you want this
message to be consumed a
Messages are the basic "unit" of Pulsar. The following table lists the
components of messages.
-Component | Description
-:---------|:-------
-Value / data payload | The data carried by the message. All Pulsar messages
contain raw bytes, although message data can also conform to data
[schemas](schema-get-started.md).
-Key | The key (string type) of the message. It is a short name of message key
or partition key. Messages are optionally tagged with keys, which is useful for
features like [topic compaction](concepts-topic-compaction.md).
-Properties | An optional key/value map of user-defined properties.
-Producer name | The name of the producer who produces the message. If you do
not specify a producer name, the default name is used.
-Topic name | The name of the topic that the message is published to.
-Schema version | The version number of the schema that the message is produced
with.
-Sequence ID | Each Pulsar message belongs to an ordered sequence on its topic.
The sequence ID of a message is initially assigned by its producer, indicating
its order in that sequence, and can also be customized.<br />Sequence ID can be
used for message deduplication. If `brokerDeduplicationEnabled` is set to
`true`, the sequence ID of each message is unique within a producer of a topic
(non-partitioned) or a partition.
-Message ID | The message ID of a message is assigned by bookies as soon as the
message is persistently stored. Message ID indicates a message’s specific
position in a ledger and is unique within a Pulsar cluster.
-Publish time | The timestamp of when the message is published. The timestamp
is automatically applied by the producer.
-Event time | An optional timestamp attached to a message by applications. For
example, applications attach a timestamp on when the message is processed. If
nothing is set to event time, the value is `0`.
+| Component | Description
|
+|:---------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Value / data payload | The data carried by the message. All Pulsar messages
contain raw bytes, although message data can also conform to data
[schemas](schema-get-started.md).
|
+| Key | The key (string type) of the message. It is a short
name of message key or partition key. Messages are optionally tagged with keys,
which is useful for features like [topic
compaction](concepts-topic-compaction.md).
|
+| Properties | An optional key/value map of user-defined properties.
|
+| Producer name | The name of the producer who produces the message. If
you do not specify a producer name, the default name is used.
|
+| Topic name | The name of the topic that the message is published
to.
|
+| Schema version | The version number of the schema that the message is
produced with.
|
+| Sequence ID | Each Pulsar message belongs to an ordered sequence on
its topic. The sequence ID of a message is initially assigned by its producer,
indicating its order in that sequence, and can also be customized.<br
/>Sequence ID can be used for message deduplication. If
`brokerDeduplicationEnabled` is set to `true`, the sequence ID of each message
is unique within a producer of a topic (non-partitioned) or a partition. |
+| Message ID | The message ID of a message is assigned by bookies as
soon as the message is persistently stored. Message ID indicates a message’s
specific position in a ledger and is unique within a Pulsar cluster.
|
+| Publish time | The timestamp of when the message is published. The
timestamp is automatically applied by the producer.
|
+| Event time | An optional timestamp attached to a message by
applications. For example, applications attach a timestamp on when the message
is processed. If nothing is set to event time, the value is `0`.
|
The default size of a message is 5 MB. You can configure the max size of a
message with the following configurations.
@@ -70,12 +70,12 @@ Producers send messages to brokers synchronously (sync) or
asynchronously (async
You can have different types of access modes on topics for producers.
-Access mode | Description
-:-----------|------------
-`Shared` | Multiple producers can publish on a topic. <br /><br
/>This is the **default** setting.
-`Exclusive` | Only one producer can publish on a topic. <br /><br />If
there is already a producer connected, other producers trying to publish on
this topic get errors immediately.<br /><br />The "old" producer is evicted and
a "new" producer is selected to be the next exclusive producer if the "old"
producer experiences a network partition with the broker.
-`ExclusiveWithFencing`|Only one producer can publish on a topic. <br /><br
/>If there is already a producer connected, it will be removed and invalidated
immediately.
-`WaitForExclusive` | If there is already a producer connected, the producer
creation is pending (rather than timing out) until the producer gets the
`Exclusive` access.<br /><br />The producer that succeeds in becoming the
exclusive one is treated as the leader. Consequently, if you want to implement
a leader election scheme for your application, you can use this access mode.
Note that the leader pattern scheme mentioned refers to using Pulsar as a
Write-Ahead Log (WAL) which means the l [...]
+| Access mode | Description
[...]
+|:-----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| `Shared` | Multiple producers can publish on a topic. <br
/><br />This is the **default** setting.
[...]
+| `Exclusive` | Only one producer can publish on a topic. <br /><br
/>If there is already a producer connected, other producers trying to publish
on this topic get errors immediately.<br /><br />The "old" producer is evicted
and a "new" producer is selected to be the next exclusive producer if the "old"
producer experiences a network partition with the broker.
[...]
+| `ExclusiveWithFencing` | Only one producer can publish on a topic. <br /><br
/>If there is already a producer connected, it will be removed and invalidated
immediately.
[...]
+| `WaitForExclusive` | If there is already a producer connected, the
producer creation is pending (rather than timing out) until the producer gets
the `Exclusive` access.<br /><br />The producer that succeeds in becoming the
exclusive one is treated as the leader. Consequently, if you want to implement
a leader election scheme for your application, you can use this access mode.
Note that the leader pattern scheme mentioned refers to using Pulsar as a
Write-Ahead Log (WAL) which means [...]
:::note
@@ -598,10 +598,157 @@ In the diagram below, **Consumer A**, **Consumer B** and
**Consumer C** are all
#### Key_Shared
-In the *Key_Shared* type, multiple consumers can attach to the same
subscription. Messages are delivered in distribution across consumers and
messages with the same key or same ordering key are delivered to only one
consumer. No matter how many times the message is re-delivered, it is delivered
to the same consumer. When a consumer connects or disconnects, it causes the
served consumer to change some message keys.
+In the *Key_Shared* type, multiple consumers can attach to the same
subscription. Messages are delivered in distribution across consumers and
messages with the same key or same ordering key are delivered to only one
consumer. No matter how many times the message is re-delivered, it is delivered
to the same consumer.

+There are three types of mapping algorithms dictating how to select a consumer
for a given message key (or ordering key): Sticky, Auto-split Hash Range, and
Auto-split Consistent Hashing. The steps for all algorithms are:
+1. The message key (or ordering key) is passed to a hash function (e.g.,
Murmur3 32-bit), yielding a 32-bit integer hash.
+2. That hash number is fed to the algorithm to select a consumer from the
existing connected consumers.
+
+```
+ +--------------+
+-----------+
+Message Key -----> / Hash Function / ----- hash (32-bit) -------> / Algorithm
/ ----> Consumer
+ +---------------+ +----------+
+```
+
+
+When a new consumer is connected and thus added to the list of connected
consumers, the algorithm re-adjusts the mapping such that some keys currently
mapped to existing consumers will be mapped to the newly added consumer. When a
consumer is disconnected, thus removed from the list of connected consumers,
keys mapped to it will be mapped to other consumers. The sections below will
explain how a consumer is selected given the message hash and how the mapping
is adjusted given a new consu [...]
+
+##### Auto-split Hash Range
+
+The algorithm assumes there is a range of numbers between 0 to 2^16 (65,536).
Each consumer is mapped into a single region in this range, so all mapped
regions cover the entire range, and no regions overlap. A consumer is selected
for a given key by running a modulo operation on the message hash by the range
size (65,536). The number received ( 0 <= i < 65,536) is contained within a
single region. The consumer mapped to that region is the one selected.
+
+Example:
+
+Suppose we have 4 consumers (C1, C2, C3 and C4), then:
+
+```
+ 0 16,384 32,768 49,152 65,536
+ |------- C3 ------|------- C2 ------|------- C1 ------|------- C4 ------|
+```
+
+Given a message key `Order-3459134`, its hash would be
`murmur32("Order-3459134") = 3112179635`, and its index in the range would be
`3112179635 mod 65536 = 6067`. That index is contained within region `[0,
16384)` thus consumer C1 will be mapped to this message key.
+
+When a new consumer is connected, the largest region is chosen and is then
split in half - the lower half will be mapped to the newly added consumer and
upper half will be mapped to the consumer owning that region. Here is how it
looks like from 1 to 4 consumers:
+
+```
+C1 connected:
+|---------------------------------- C1 ---------------------------------|
+
+C2 connected:
+|--------------- C2 ----------------|---------------- C1 ---------------|
+
+C3 connected:
+|------- C3 ------|------- C2 ------|---------------- C1 ---------------|
+
+C4 connected:
+|------- C3 ------|------- C2 ------|------- C1 ------|------- C4 ------|
+```
+
+When a consumer is disconnected its region will be merged into the region on
its right. Examples:
+
+C4 is disconnected:
+
+```
+|------- C3 ------|------- C2 ------|---------------- C1 ---------------|
+```
+
+C1 is disconnected:
+
+```
+|------- C3 ------|-------------------------- C2 -----------------------|
+```
+
+The advantages of this algorithm is that it affects only a single existing
consumer upon add/delete consumer, at the expense of regions not evenly sized.
Thi means some consumers gets more keys that others. The next algorithm does
the other way around.
+
+##### Auto-split Consistent Hashing
+
+This algorithm uses a Hash Ring. It's a range of number from 0 to MAX_INT
(32-bit) in which if you traverse the range, when reaching MAX_INT, the next
number would be zero. It is as if you took a line starting from 0 ending at
MAX_INT and bent into a circle such that the end glues to the start:
+
+```
+ MAX_INT -----++--------- 0
+ ||
+ , - ~ ~ ~ - ,
+ , ' ' ,
+ , ,
+ , ,
+ , ,
+ , ,
+ , ,
+ , ,
+ , ,
+ , , '
+ ' - , _ _ _ , '
+```
+
+When adding a consumer, we mark 100 points on that circle and associate them
to the newly added consumer. For each number between 1 and 100, we concatenate
the consumer name to that number and run the hash function on it to get the
location of the point on the circle that will be marked. For Example, if the
consumer name is "orders-aggregator-pod-2345-consumer" then we would mark 100
points on that circle:
+```
+ murmur32("orders-aggregator-pod-2345-consumer1") = 1003084738
+ murmur32("orders-aggregator-pod-2345-consumer2") = 373317202
+ ...
+ murmur32("orders-aggregator-pod-2345-consumer100") = 320276078
+```
+
+Since the hash function has the uniform distribution attribute, those points
would be uniformly distributed across the circle.
+
+```
+ C1-100
+ , - ~ ~ ~ - , C1-1
+ , ' ' ,
+ , ,
+ , , C1-2
+ , ,
+ , ,
+ , ,
+ , , C1-3
+ , ,
+ , , '
+ ' - , _ _ _ , ' ...
+
+```
+
+A consumer is selected for a given message key by putting its hash on the
circle then continue clock-wise on the circle until you reach a marking point.
The point might have more than one consumer on it (hash function might have
collisions) there for, we run the following operation to get a position within
the list of consumers for that point, then we take the consumer in that
position: `hash % consumer_list_size = index`.
+
+When a consumer is added, we add 100 marking points to the circle as explained
before. Due to the uniform distribution of the hash function, those 100 points
act as if the new consumer takes a small slice of keys out of each existing
consumer. It maintains the even distribution, on the trade-off that it impacts
all existing consumers. [This
video](https://www.youtube.com/watch?v=zaRkONvyGr8) explains the concept of
Consistent Hashing quite well (the only difference is that in Pulsar's ca [...]
+
+##### Sticky
+
+The algorithm assumes there is a range of numbers between 0 to 2^16 (65,536).
Each consumer is mapped to a multiple regions in this range and there is no
overlap between regions. The consumer is selected by running a modulo operation
on the message hash by the range size (65,536), the number received (0 <= i <
65,536), is contained within a single region. The consumer mapped to the region
is the one selected.
+In this algorithm you have full control. Every newly added consumer specifies
the ranges it wishes to be mapped to by using Consumer API. When the consumer
object is constructed, you can specify the list of ranges. It's your
responsibility to make sure there are no overlaps and all the range is covered
by regions.
+
+Example:
+
+Suppose we have 2 consumers (C1 and C2) each specified their ranges, then:
+
+```
+C1 = [0, 16384), (32768, 49152]
+C2 = [16384, 32768), (49,152, 65536]
+
+ 0 16,384 32,768 49,152 65,536
+ |------- C1 ------|------- C2 ------|------- C1 ------|------- C2 ------|
+```
+
+Given a message key `Order-3459134`, it's hash would be
`murmur32("Order-3459134") = 3112179635`, and it's index in the range would be
`3112179635 mod 65536 = 6067`. That index is contained within `[0, 16384)` thus
consumer C1 will map to this message key.
+
+If the newly connected consumer didn't supply their ranges, or they overlap
with existing consumer ranges, it's disconnected, removed from the consumers
list and reverted as if it never tried to connect.
+
+##### How to use them?
+
+When building the consumer, you can specify the Key Shared Mode:
+* AUTO_SPLIT - Auto-split Hash Range
+* STICKY - Sticky
+
+Consistent Hashing will be used instead of Hash Range for Auto-split if the
broker configuration `subscriptionKeySharedUseConsistentHashing` is enabled.
+
+##### Preserving order of processing
+
+Key Shared Subscription type guarantees a key will be processed by a *single*
consumer at any given time. When a new consumer a connected, some key will be
mapped to it from existing consumers. The broker will not deliver messages to
the new consumer until all messages delivered up until connection time will be
acknowledged. This will guarantee a certain key is processed by a single
consumer at any given time. The trade-off is that if one of the existing
consumers is stuck and no time-ou [...]
+
+That requirement can be relaxed by enabling `allowOutOfOrderDelivery` via the
Consumer API. If set on the new consumer, then when it is connected, the broker
will allow it to receive messages knowing some messages of that key may be
still be processing in other consumers at the time, thus order may be affected
for that short period of adding a new consumer.
+
+##### Batching for Key Shared Subscriptions
+
:::note
When the consumers are using the Key_Shared subscription type, you need to
**disable batching** or **use key-based batching** for the producers.