codelipenghui commented on code in PR #21246:
URL: https://github.com/apache/pulsar/pull/21246#discussion_r1374167744


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java:
##########
@@ -18,14 +18,37 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
+
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.ToLongFunction;
 
 public abstract class AbstractSubscription implements Subscription {
+    protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = 
"pulsar.subscription.isolation.level";

Review Comment:
   And for all the system properties, please add `__` as a prefix.



##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -399,6 +399,14 @@ message CommandSubscribe {
 
     // The consumer epoch, when exclusive and failover consumer redeliver 
unack message will increase the epoch
     optional uint64 consumer_epoch = 19;
+
+    enum IsolationLevel {

Review Comment:
   Yes. The broker should only read the isolation type from a client version 
that has isolation-level support. Otherwise, the broker will get an 
incompatibility issue.
   
   We should also add an integration test to 
https://github.com/apache/pulsar/tree/master/tests/integration
   And we should also add a compatibility test like 
https://github.com/apache/pulsar/tree/master/tests/bc_2_6_0



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java:
##########
@@ -831,6 +831,16 @@ public interface ConsumerBuilder<T> extends Cloneable {
      */
     ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
 
+    /**
+     * Sets the {@link SubscriptionIsolationLevel} for the consumer.
+     *
+     * @param subscriptionIsolationLevel If READ_COMMITTED is selected, the 
Consumer can only consume all transactional messages which have been committed,
+     *                                   else if READ_UNCOMMITTED is selected, 
the Consumer can consume all messages, even transactional messages which have 
been aborted.
+     *                                   Note that this is a subscription 
dimension configuration, and all consumers under the same subscription need to 
be configured with the same IsolationLevel.

Review Comment:
   The 
[doc](https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
 of Kafka also can be referred to.
   
   <img width="1506" alt="image" 
src="https://github.com/apache/pulsar/assets/12592133/23728537-db01-44a9-828e-a5f9adc6cbe5";>
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java:
##########
@@ -18,14 +18,37 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
+
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.ToLongFunction;
 
 public abstract class AbstractSubscription implements Subscription {
+    protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = 
"pulsar.subscription.isolation.level";
     protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
+    public static void wrapIsolationLevelToProperties(Map<String, String> 
properties, IsolationLevel isolationLevel) {
+        if (properties != null) {
+            properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, 
String.valueOf(isolationLevel.getValue()));

Review Comment:
   Sorry, why do we need to persist the isolation level to properties?
   It should follow the subscription type. The subscription will use the 
isolation type that the first consumer provided.
   After all consumer disconnected, a new consumer will get a chance to change 
the isolation level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to