poorbarcode commented on code in PR #21246:
URL: https://github.com/apache/pulsar/pull/21246#discussion_r1373363223


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -160,6 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String 
subscriptionName, Ma
             this.pendingAckHandle = new PendingAckHandleDisabled();
         }
         IS_FENCED_UPDATER.set(this, FALSE);
+        this.isolationLevel = 
fetchIsolationLevelFromProperties(subscriptionProperties);

Review Comment:
   Do we support users using a message that is read from a  subscription typed 
`READ_UNCOMMITTED` for a new transaction?
   
   - If false, we should init the component `pendingAckHandle` typed 
`PendingAckHandleDisabled` when the `isolationLevel` is `READ_UNCOMMITTED`, 
right?
   - If yes, how can we skip the `recover task` for this pending ack?
   
    And please add a test for this case.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java:
##########
@@ -18,14 +18,37 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
+
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.ToLongFunction;
 
 public abstract class AbstractSubscription implements Subscription {
+    protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = 
"pulsar.subscription.isolation.level";
     protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
+    public static void wrapIsolationLevelToProperties(Map<String, String> 
properties, IsolationLevel isolationLevel) {
+        if (properties != null) {
+            properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, 
String.valueOf(isolationLevel.getValue()));
+        }
+    }
+
+    public IsolationLevel fetchIsolationLevelFromProperties(Map<String, 
String> properties) {
+        if (properties == null) {
+            return IsolationLevel.READ_COMMITTED;
+        }
+
+        if (properties.containsKey(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)) {
+            IsolationLevel isolationLevel = 
IsolationLevel.valueOf(Integer.parseInt(properties.get(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)));

Review Comment:
   Could we catch the `NumberFormatException` here? And please add a test for 
this case.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java:
##########
@@ -18,14 +18,37 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
+
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.ToLongFunction;
 
 public abstract class AbstractSubscription implements Subscription {
+    protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = 
"pulsar.subscription.isolation.level";
     protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
+    public static void wrapIsolationLevelToProperties(Map<String, String> 
properties, IsolationLevel isolationLevel) {
+        if (properties != null) {
+            properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, 
String.valueOf(isolationLevel.getValue()));

Review Comment:
   Once the subscription was created, the prop 
`SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY` can not be modified, right?
   
   If yes, we should prevent modifying it by the API `pulsar-admin topics 
update-subscription-properties`. And please add a test for this case.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java:
##########
@@ -1331,6 +1332,25 @@ public void 
testGetPartitionedStatsContainSubscriptionType() throws Exception {
         assertEquals(topicStats.getSubscriptions().get(subName).getType(), 
SubscriptionType.Exclusive.toString());
     }
 
+    @Test
+    public void testGetPartitionedStatsContainSubscriptionIsolationLevel() 
throws Exception {
+        final String topic = "persistent://prop-xyz/ns1/my-topic" + 
UUID.randomUUID();
+        final int numPartitions = 4;
+        admin.topics().createPartitionedTopic(topic, numPartitions);
+
+        // create consumer and subscription
+        final String subName = "my-sub";
+        @Cleanup Consumer<byte[]> exclusiveConsumer = 
pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Exclusive)
+                
.subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED)
+                .subscribe();
+
+        TopicStats topicStats = admin.topics().getPartitionedStats(topic, 
false);
+        assertEquals(topicStats.getSubscriptions().size(), 1);
+        
assertEquals(topicStats.getSubscriptions().get(subName).getSubscriptionIsolationLevel(),
 SubscriptionIsolationLevel.READ_UNCOMMITTED.toString());

Review Comment:
   Could we also add another case here? just like below:
   - close consumer,
   - unload topic
   - check `isolationLevel`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java:
##########
@@ -54,4 +60,13 @@ public void testGetBytesOutCounter() {
         when(consumer.getBytesOutCounter()).thenReturn(2L);
         assertEquals(subscription.getBytesOutCounter(), 3L);
     }
+
+    @Test
+    public void testWrapAndFetchIsolationLevelInProperties() {
+        Map<String, String> properties = new HashMap<>(1);
+        AbstractSubscription.wrapIsolationLevelToProperties(properties, 
IsolationLevel.READ_UNCOMMITTED);

Review Comment:
   Could we add other cases?
   - `null`
   - `READ_COMMITTED `



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java:
##########
@@ -18,14 +18,37 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
+
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.ToLongFunction;
 
 public abstract class AbstractSubscription implements Subscription {
+    protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = 
"pulsar.subscription.isolation.level";
     protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
+    public static void wrapIsolationLevelToProperties(Map<String, String> 
properties, IsolationLevel isolationLevel) {
+        if (properties != null) {

Review Comment:
   If the props is null, you discarded the prop `isolationLevel`, it is wrong



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java:
##########
@@ -18,14 +18,37 @@
  */
 package org.apache.pulsar.broker.service;
 
+import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
+
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.ToLongFunction;
 
 public abstract class AbstractSubscription implements Subscription {
+    protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = 
"pulsar.subscription.isolation.level";

Review Comment:
   Since it is a prop for transaction, we should make this prop name contains 
the keyword `transaction`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java:
##########
@@ -45,6 +45,7 @@ public class SubscriptionOption {
     private Map<String, String> metadata;
     private boolean readCompacted;
     private CommandSubscribe.InitialPosition initialPosition;
+    private CommandSubscribe.IsolationLevel isolationLevel;

Review Comment:
   Can you modify this variable with `final`?
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java:
##########
@@ -236,6 +240,11 @@ public String getTypeString() {
         return "Null";
     }
 
+    @Override
+    public IsolationLevel getIsolationLevel() {

Review Comment:
   Maybe use `@lombok.Getter` instead of this method is better?



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * When creating a consumer, if the subscription does not exist, a new 
subscription will be created.
+ * The default isolation level for Subscription is 'READ_COMMITTED'.
+ * See {@link #subscriptionIsolationLevel(SubscriptionIsolationLevel)} to 
configure the isolation level behavior.
+ */
[email protected]
[email protected]
+public enum SubscriptionIsolationLevel {
+    // Consumer can only consume all transactional messages which have been 
committed.
+    READ_COMMITTED(0),
+
+    // Consumer can consume all messages, even transactional messages which 
have been aborted.
+    READ_UNCOMMITTED(1);
+
+    private final int value;

Review Comment:
   You stored the `isolationLevel` as an `Int` to the metadata store and 
responded with a `String` value to `topics stats`. How about using `toString()` 
instead of this field?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java:
##########
@@ -228,6 +229,89 @@ public void sortedTest() throws Exception {
         log.info("TransactionConsumeTest sortedTest finish.");
     }
 
+    @Test
+    public void testConsumeMessageWithDifferentIsolationLevel() throws 
Exception {
+        int messageCntBeforeTxn = 10;
+        int transactionMessageCnt = 10;
+        int messageCntAfterTxn = 10;
+        int totalMsgCnt = messageCntBeforeTxn + transactionMessageCnt + 
messageCntAfterTxn;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(CONSUME_TOPIC)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumerWithReadCommittedIsolationLevel = 
pulsarClient.newConsumer()
+                .topic(CONSUME_TOPIC)
+                .subscriptionName("read-committed-isolation-test")
+                .subscribe();
+
+        @Cleanup
+        Consumer<byte[]> consumerWithReadUnCommittedIsolationLevel = 
pulsarClient.newConsumer()
+                .topic(CONSUME_TOPIC)
+                .subscriptionName("read-unCommitted-isolation-test")
+                
.subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED)
+                .subscribe();
+
+        
Awaitility.await().until(consumerWithReadUnCommittedIsolationLevel::isConnected);
+
+        long mostSigBits = 2L;
+        long leastSigBits = 5L;
+        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(CONSUME_TOPIC, false).get().get();
+        log.info("transactionBuffer init finish.");
+
+        List<String> sendMessageList = new ArrayList<>();
+        sendNormalMessages(producer, 0, messageCntBeforeTxn, sendMessageList);
+        appendTransactionMessages(txnID, persistentTopic, 
transactionMessageCnt, sendMessageList);
+        sendNormalMessages(producer, messageCntBeforeTxn, messageCntAfterTxn, 
sendMessageList);
+
+        Message<byte[]> message;
+
+        for (int i = 0; i < totalMsgCnt; i++) {
+            // 1. for consumer 'consumerWithReadUnCommittedIsolationLevel', 
Because the transaction isolation level is ReadUncommitted, all messages can be 
read
+            message = consumerWithReadUnCommittedIsolationLevel.receive(500, 
TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(message);
+            if (i < messageCntBeforeTxn) {
+                log.info("Consumer with ReadUnCommittedIsolationLevel Receive 
normal msg: {}" + new String(message.getData(), UTF_8));
+            } else {
+                if (i < messageCntBeforeTxn + transactionMessageCnt) {
+                    log.info("Consumer with ReadUnCommittedIsolationLevel 
Receive txn id: {}, msg: {}", message.getMessageId(), new 
String(message.getData()));
+                } else {
+                    log.info("Consumer with ReadUnCommittedIsolationLevel 
Receive normal msg: {}" + new String(message.getData(), UTF_8));
+                }
+            }
+
+            // 2. for consumer 'consumerWithReadCommittedIsolationLevel', 
Because the transaction isolation level is ReadUnCommitted,
+            // it can only read 'messageCntBeforeTxn' messages before the 
transaction is committed
+            if (i < messageCntBeforeTxn) {
+                message = consumerWithReadCommittedIsolationLevel.receive(500, 
TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(message);
+                log.info("Consumer with ReadCommittedIsolationLevel Receive 
normal msg: {}" + new String(message.getData(), UTF_8));
+            } else {
+                message = consumerWithReadCommittedIsolationLevel.receive(500, 
TimeUnit.MILLISECONDS);
+                Assert.assertNull(message);
+                log.info("Consumer with ReadCommittedIsolationLevel can't 
receive message before commit.");
+            }
+        }
+
+        // Now commit the transaction
+        persistentTopic.endTxn(txnID, TxnAction.COMMIT_VALUE, 0L).get();

Review Comment:
   Could you also add another test case: `abort` this transaction before 
receiving messages?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java:
##########
@@ -2295,6 +2296,33 @@ public void testSubscribeCommand() throws Exception {
         channel.finish();
     }
 
+    @Test(timeOut = 30000)
+    public void testSubscribeCommandWithIsolationLevel() throws Exception {
+        resetChannel();
+        setChannelConnected();
+        svcConfig.setAuthenticationEnabled(false);
+        svcConfig.setAuthorizationEnabled(false);
+        // test SUBSCRIBE on topic and cursor creation success with 
'ReadUnCommitted' isolation level.
+        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
+                successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
+                "test" /* consumer name */, 0 /* avoid reseting cursor */, 
IsolationLevel.READ_UNCOMMITTED);

Review Comment:
   Should we add other cases:
   - `READ_COMMITTED`
   - a null value of the attribute `isolationLevel `



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java:
##########
@@ -228,6 +229,89 @@ public void sortedTest() throws Exception {
         log.info("TransactionConsumeTest sortedTest finish.");
     }
 
+    @Test
+    public void testConsumeMessageWithDifferentIsolationLevel() throws 
Exception {

Review Comment:
   Could you also add an E2E test ( do not `start & commit` transaction by 
`PersistentTopic`, just use a producer to do it). and another two test cases:
   - `abort` this transaction before receiving messages
   - `unload topic` before receiving message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to