This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 618aedea76c [improve] [pip] PIP-298 Consumer supports specifying
consumption isolation level (#21114)
618aedea76c is described below
commit 618aedea76c7cb3013db3baed1660cf4c23022e5
Author: hzh0425 <[email protected]>
AuthorDate: Tue Oct 24 13:32:08 2023 +0800
[improve] [pip] PIP-298 Consumer supports specifying consumption isolation
level (#21114)
### Motivation
This pip is to implement Read Committed and Read Uncommitted isolation
levels for Pulsar transactions, allow consumers to configure isolation levels
during the building process.
For more details, please refer to `pip-298.md`
---
pip/pip-298.md | 197 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 197 insertions(+)
diff --git a/pip/pip-298.md b/pip/pip-298.md
new file mode 100644
index 00000000000..a0953ad03a3
--- /dev/null
+++ b/pip/pip-298.md
@@ -0,0 +1,197 @@
+# Background
+
+In the implementation of the Pulsar Transaction, each topic is configured with
a `Transaction Buffer` to prevent
+consumers from reading uncommitted messages, which are invisible until the
transaction is committed. Transaction Buffer
+works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker
only dispatches messages, before the
+maxReadPosition, to the consumers. When the broker dispatches the messages
before maxReadPosition to the consumer, the
+messages sent by aborted transactions will get filtered by the Transaction
Buffer.
+
+# Motivation
+
+Currently, Pulsar transactions do not have configurable isolation levels. By
introducing isolation level configuration
+for consumers, we can enhance the flexibility of Pulsar transactions.
+
+Let's consider an example:
+
+**System**: Financial Transaction System
+
+**Operations**: Large volume of deposit and withdrawal operations, a
+small number of transfer operations.
+
+**Roles**:
+
+- **Client A1**
+- **Client A2**
+- **User Account B1**
+- **User Account B2**
+- **Request Topic C**
+- **Real-time Monitoring System D**
+- **Business Processing System E**
+
+**Client Operations**:
+
+- **Withdrawal**: Client A1 decreases the deposit amount from User
+ Account B1 or B2.
+- **Deposit**: Client A1 increases the deposit amount in User Account B1 or B2.
+- **Transfer**: Client A2 decreases the deposit amount from User
+ Account B1 and increases it in User Account B2. Or vice versa.
+
+**Real-time Monitoring System D**: Obtains the latest data from
+Request Topic C as quickly as possible to monitor transaction data and
+changes in bank reserves in real-time. This is necessary for the
+timely detection of anomalies and real-time decision-making.
+
+**Business Processing System E**: Reads data from Request Topic C,
+then actually operates User Accounts B1, B2.
+
+**User Scenario**: Client A1 sends a large number of deposit and
+withdrawal requests to Request Topic C. Client A2 writes a small
+number of transfer requests to Request Topic C.
+
+In this case, Business Processing System E needs a read-committed
+isolation level to ensure operation consistency and Exactly Once
+semantics. The real-time monitoring system does not care if a small
+number of transfer requests are incomplete (dirty data). What it
+cannot tolerate is a situation where a large number of deposit and
+withdrawal requests cannot be presented in real time due to a small
+number of transfer requests (the current situation is that uncommitted
+transaction messages can block the reading of committed transaction
+messages).
+
+In this case, it is necessary to set different isolation levels for
+different consumers/subscriptions.
+The uncommitted transactions do not impact actual users' bank accounts.
+Business Processing System E only reads committed transactional
+messages and operates users' accounts. It needs Exactly-once semantic.
+Real-time Monitoring System D reads uncommitted transactional
+messages. It does not need Exactly-once semantic.
+
+They use different subscriptions and choose different isolation
+levels. One needs transaction, one does not.
+In general, multiple subscriptions of the same topic do not all
+require transaction guarantees.
+Some want low latency without the exact-once semantic guarantee, and
+some must require the exactly-once guarantee.
+We just provide a new option for different subscriptions.
+
+# Goal
+
+## In Scope
+
+Implement Read Committed and Read Uncommitted isolation levels for Pulsar
transactions. Allow consumers to configure
+isolation levels during the building process.
+
+## Out of Scope
+
+None.
+
+# High Level Design
+
+Add a configuration 'subscriptionIsolationLevel' in the consumer builder to
allow users to choose different transaction
+isolation levels.
+
+# Detailed Design
+
+## Public-facing Changes
+
+Update the PulsarConsumer builder process to include isolation level
configurations for Read Committed and Read
+Uncommitted.
+
+### Before the Change
+
+The PulsarConsumer builder process currently does not include isolation level
configurations. The consumer creation
+process might look like this:
+
+```
+PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic("persistent://my-tenant/my-namespace/my-topic")
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+```
+
+### After the Change
+
+Update the PulsarConsumer builder process to include isolation level
configurations for Read Committed and Read
+Uncommitted. Introduce a new method subscriptionIsolationLevel() in the
consumer builder, which accepts an enumeration
+value representing the isolation level:
+
+```
+public enum SubscriptionIsolationLevel {
+ // Consumer can only consume all transactional messages which have been
committed.
+ READ_COMMITTED,
+
+ // Consumer can consume all messages, even transactional messages which
have been aborted.
+ READ_UNCOMMITTED;
+}
+```
+
+Then, modify the consumer creation process to include the new isolation level
configuration:
+
+```
+PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic("persistent://my-tenant/my-namespace/my-topic")
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_COMMITTED)
// Adding the isolation level configuration
+ .subscribe();
+```
+
+With this change, users can now choose between Read Committed and Read
Uncommitted isolation levels when creating a new
+consumer. If the isolationLevel() method is not called during the builder
process, the default isolation level will be
+Read Committed.
+Note that this is a subscription dimension configuration, and all consumers
under the same subscription need to be
+configured with the same IsolationLevel.
+
+## Design & Implementation Details
+
+### Client Changes
+
+Update the PulsarConsumer builder to accept isolation level configurations for
Read Committed and Read Uncommitted levels.
+
+In order to achieve the above goals, the following modifications need to be
made:
+
+- Added `IsolationLevel` related fields and methods in
`ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl`
+
+- Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel
+
+```
+message CommandSubscribe {
+
+ enum IsolationLevel {
+ READ_COMMITTED = 0;
+ READ_UNCOMMITTED = 1;
+ }
+ optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED];
+}
+```
+
+### Broker changes
+
+Modify the transaction buffer and dispatching mechanisms to handle messages
based on the chosen isolation level.
+
+In order to achieve the above goals, the following modifications need to be
made:
+
+- Determine in the `readMoreEntries` method of Dispatchers such as
`PersistentDispatcherSingleActiveConsumer`
+ and `PersistentDispatcherMultipleConsumers`:
+
+ - If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition =
topic.getMaxReadPosition(), that is,
+ transactionBuffer.getMaxReadPosition()
+
+ - If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition =
PositionImpl.LATEST
+
+- Add a new metrics `subscriptionIsolationLevel` in `SubscriptionStatsImpl`.
+
+# Monitoring
+
+After this PIP, Users can query the subscription stats of a topic through the
admin tool, and observe the `subscriptionIsolationLevel` in the subscription
stats to determine the isolation level of the subscription.
+
+# Links
+
+* Mailing List discussion thread:
https://lists.apache.org/thread/8ny0qtp7m9qcdbvnfjdvpnkc4c5ssyld
+* Mailing List voting thread:
https://lists.apache.org/thread/4q1hrv466h8w9ccpf4moxt6jv1jxp1mr
+* Document link: https://github.com/apache/pulsar-site/pull/712