asafm commented on code in PR #21114:
URL: https://github.com/apache/pulsar/pull/21114#discussion_r1343716486


##########
pip/pip-298.md:
##########
@@ -0,0 +1,105 @@
+## Background
+
+In the implementation of the Pulsar Transaction, each topic is configured with 
a `Transaction Buffer` to prevent consumers from reading uncommitted messages, 
which are invisible until the transaction is committed. Transaction Buffer 
works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only 
dispatches messages, before the maxReadPosition, to the consumers. When the 
broker dispatches the messages before maxReadPosition to the consumer, the 
messages sent by aborted transactions will get filtered by the Transaction 
Buffer.
+
+## Motivation
+
+Currently, Pulsar transactions do not have configurable isolation levels. By 
introducing isolation level configuration for consumers, we can enhance the 
flexibility of Pulsar transactions.
+
+## Goal
+
+### In Scope
+
+Implement Read Committed and Read Uncommitted isolation levels for Pulsar 
transactions.Allow consumers to configure isolation levels during the building 
process.
+
+### Out of Scope
+
+None.
+
+## API Changes
+
+Update the PulsarConsumer builder process to include isolation level 
configurations for Read Committed and Read Uncommitted.
+
+### Before the Change
+
+The PulsarConsumer builder process currently does not include isolation level 
configurations. The consumer creation process might look like this:
+
+```
+PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        .topic("persistent://my-tenant/my-namespace/my-topic")
+        .subscriptionName("my-subscription")
+        .subscriptionType(SubscriptionType.Shared)
+        .subscribe();
+```
+
+### After the Change
+
+Update the PulsarConsumer builder process to include isolation level 
configurations for Read Committed and Read Uncommitted. Introduce a new method 
isolationLevel() in the consumer builder, which accepts an enumeration value 
representing the isolation level:
+
+```
+public enum IsolationLevel {
+    READ_COMMITTED,
+    READ_UNCOMMITTED
+}
+```
+
+Then, modify the consumer creation process to include the new isolation level 
configuration:
+
+```
+PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        .topic("persistent://my-tenant/my-namespace/my-topic")
+        .subscriptionName("my-subscription")
+        .subscriptionType(SubscriptionType.Shared)
+        .subscriptionIsolationLevel(IsolationLevel.READ_COMMITTED) // Adding 
the isolation level configuration
+        .subscribe();
+```
+
+With this change, users can now choose between Read Committed and Read 
Uncommitted isolation levels when creating a new
+consumer. If the isolationLevel() method is not called during the builder 
process, the default isolation level will be
+Read Committed.
+Note that this is a subscription dimension configuration, and all consumers 
under the same subscription need to be
+configured with the same IsolationLevel.
+
+## Implementation
+
+### Client Changes

Review Comment:
   Just very small note. In the template of the pip we have a section "## 
Public-facing Changes" and several h3 heading so users can clearly see what's 
changing in any public facing API. In your case I presume if the binary 
protocol changes.



##########
pip/pip-298.md:
##########
@@ -0,0 +1,105 @@
+## Background
+
+In the implementation of the Pulsar Transaction, each topic is configured with 
a `Transaction Buffer` to prevent consumers from reading uncommitted messages, 
which are invisible until the transaction is committed. Transaction Buffer 
works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only 
dispatches messages, before the maxReadPosition, to the consumers. When the 
broker dispatches the messages before maxReadPosition to the consumer, the 
messages sent by aborted transactions will get filtered by the Transaction 
Buffer.
+
+## Motivation
+
+Currently, Pulsar transactions do not have configurable isolation levels. By 
introducing isolation level configuration for consumers, we can enhance the 
flexibility of Pulsar transactions.
+
+## Goal
+
+### In Scope
+
+Implement Read Committed and Read Uncommitted isolation levels for Pulsar 
transactions.Allow consumers to configure isolation levels during the building 
process.
+
+### Out of Scope
+
+None.
+
+## API Changes
+
+Update the PulsarConsumer builder process to include isolation level 
configurations for Read Committed and Read Uncommitted.
+
+### Before the Change
+
+The PulsarConsumer builder process currently does not include isolation level 
configurations. The consumer creation process might look like this:
+
+```
+PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        .topic("persistent://my-tenant/my-namespace/my-topic")
+        .subscriptionName("my-subscription")
+        .subscriptionType(SubscriptionType.Shared)
+        .subscribe();
+```
+
+### After the Change
+
+Update the PulsarConsumer builder process to include isolation level 
configurations for Read Committed and Read Uncommitted. Introduce a new method 
isolationLevel() in the consumer builder, which accepts an enumeration value 
representing the isolation level:
+
+```
+public enum IsolationLevel {
+    READ_COMMITTED,
+    READ_UNCOMMITTED
+}
+```
+
+Then, modify the consumer creation process to include the new isolation level 
configuration:
+
+```
+PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        .topic("persistent://my-tenant/my-namespace/my-topic")
+        .subscriptionName("my-subscription")
+        .subscriptionType(SubscriptionType.Shared)
+        .subscriptionIsolationLevel(IsolationLevel.READ_COMMITTED) // Adding 
the isolation level configuration
+        .subscribe();
+```
+
+With this change, users can now choose between Read Committed and Read 
Uncommitted isolation levels when creating a new
+consumer. If the isolationLevel() method is not called during the builder 
process, the default isolation level will be
+Read Committed.
+Note that this is a subscription dimension configuration, and all consumers 
under the same subscription need to be
+configured with the same IsolationLevel.
+
+## Implementation
+
+### Client Changes
+
+Update the PulsarConsumer builder to accept isolation level configurations for 
Read Committed and Read Uncommitted levels.
+
+In order to achieve the above goals, the following modifications need to be 
made:
+
+- Added `IsolationLevel` related fields and methods in 
`ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl`
+
+- Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel
+
+```
+message CommandSubscribe {
+
+    enum IsolationLevel {
+    READ_COMMITTED = 0;
+    READ_UNCOMMITTED = 1;
+    }
+    optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED];
+}
+```
+
+### Broker changes
+
+Modify the transaction buffer and dispatching mechanisms to handle messages 
based on the chosen isolation level.
+
+In order to achieve the above goals, the following modifications need to be 
made:
+
+- Determine in the `readMoreEntries` method of Dispatchers such as 
`PersistentDispatcherSingleActiveConsumer` and 
`PersistentDispatcherMultipleConsumers`:
+
+    - If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = 
topic.getMaxReadPosition(), that is, transactionBuffer.getMaxReadPosition()
+
+    - If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition 
= PositionImpl.LATEST
+
+
+## Reject Alternatives
+
+None

Review Comment:
   In the original template there is "# Monitoring". 
   One aspect of monitoring IMO is to know for each subscription, it's 
isolation level. Can you please describe how users will be able to know that? 
What is changing in "## Public-facing Changes״ ׳ which allows me to know that?



##########
pip/pip-298.md:
##########
@@ -0,0 +1,105 @@
+## Background
+
+In the implementation of the Pulsar Transaction, each topic is configured with 
a `Transaction Buffer` to prevent consumers from reading uncommitted messages, 
which are invisible until the transaction is committed. Transaction Buffer 
works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only 
dispatches messages, before the maxReadPosition, to the consumers. When the 
broker dispatches the messages before maxReadPosition to the consumer, the 
messages sent by aborted transactions will get filtered by the Transaction 
Buffer.
+
+## Motivation
+
+Currently, Pulsar transactions do not have configurable isolation levels. By 
introducing isolation level configuration for consumers, we can enhance the 
flexibility of Pulsar transactions.
+
+## Goal
+
+### In Scope
+
+Implement Read Committed and Read Uncommitted isolation levels for Pulsar 
transactions.Allow consumers to configure isolation levels during the building 
process.
+
+### Out of Scope
+
+None.
+
+## API Changes
+
+Update the PulsarConsumer builder process to include isolation level 
configurations for Read Committed and Read Uncommitted.
+
+### Before the Change
+
+The PulsarConsumer builder process currently does not include isolation level 
configurations. The consumer creation process might look like this:
+
+```
+PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        .topic("persistent://my-tenant/my-namespace/my-topic")
+        .subscriptionName("my-subscription")
+        .subscriptionType(SubscriptionType.Shared)
+        .subscribe();
+```
+
+### After the Change
+
+Update the PulsarConsumer builder process to include isolation level 
configurations for Read Committed and Read Uncommitted. Introduce a new method 
isolationLevel() in the consumer builder, which accepts an enumeration value 
representing the isolation level:
+
+```
+public enum IsolationLevel {
+    READ_COMMITTED,
+    READ_UNCOMMITTED
+}
+```
+
+Then, modify the consumer creation process to include the new isolation level 
configuration:
+
+```
+PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+        .topic("persistent://my-tenant/my-namespace/my-topic")
+        .subscriptionName("my-subscription")
+        .subscriptionType(SubscriptionType.Shared)
+        .subscriptionIsolationLevel(IsolationLevel.READ_COMMITTED) // Adding 
the isolation level configuration
+        .subscribe();
+```
+
+With this change, users can now choose between Read Committed and Read 
Uncommitted isolation levels when creating a new
+consumer. If the isolationLevel() method is not called during the builder 
process, the default isolation level will be
+Read Committed.
+Note that this is a subscription dimension configuration, and all consumers 
under the same subscription need to be
+configured with the same IsolationLevel.
+
+## Implementation
+
+### Client Changes
+
+Update the PulsarConsumer builder to accept isolation level configurations for 
Read Committed and Read Uncommitted levels.
+
+In order to achieve the above goals, the following modifications need to be 
made:
+
+- Added `IsolationLevel` related fields and methods in 
`ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl`
+
+- Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel
+
+```
+message CommandSubscribe {
+
+    enum IsolationLevel {
+    READ_COMMITTED = 0;
+    READ_UNCOMMITTED = 1;
+    }
+    optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED];
+}
+```
+
+### Broker changes
+
+Modify the transaction buffer and dispatching mechanisms to handle messages 
based on the chosen isolation level.
+
+In order to achieve the above goals, the following modifications need to be 
made:
+
+- Determine in the `readMoreEntries` method of Dispatchers such as 
`PersistentDispatcherSingleActiveConsumer` and 
`PersistentDispatcherMultipleConsumers`:
+
+    - If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = 
topic.getMaxReadPosition(), that is, transactionBuffer.getMaxReadPosition()
+
+    - If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition 
= PositionImpl.LATEST
+
+
+## Reject Alternatives
+
+None

Review Comment:
   In the original template there is "# Monitoring". 
   One aspect of monitoring IMO is to know for each subscription, it's 
isolation level. Can you please describe how users will be able to know that? 
What is changing in "## Public-facing Changes״ ׳ which allows me to know that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to