This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 149b9f770a4 [fix][broker] Fix pulsar.replicated.subscription checks 
(#23782)
149b9f770a4 is described below

commit 149b9f770a44e693729e850689aa215d4d5af25c
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Dec 31 10:15:41 2024 +0800

    [fix][broker] Fix pulsar.replicated.subscription checks (#23782)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../service/persistent/PersistentSubscription.java | 10 ++++--
 .../broker/service/persistent/PersistentTopic.java |  9 ++++--
 .../persistent/PersistentSubscriptionTest.java     | 37 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8cebbd52695..b5a1a9db5de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -118,7 +118,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
     // for connected subscriptions, message expiry will be checked if the 
backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
-    private static final String REPLICATED_SUBSCRIPTION_PROPERTY = 
"pulsar.replicated.subscription";
+    protected static final String REPLICATED_SUBSCRIPTION_PROPERTY = 
"pulsar.replicated.subscription";
 
     // Map of properties that is used to mark this subscription as 
"replicated".
     // Since this is the only field at this point, we can just keep a static
@@ -140,8 +140,12 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
     }
 
-    static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
-        return 
cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
+    static Optional<Boolean> 
getReplicatedSubscriptionConfiguration(ManagedCursor cursor) {
+        Long v = cursor.getProperties().get(REPLICATED_SUBSCRIPTION_PROPERTY);
+        if (v == null || (v < 0L || v > 1L)) {
+            return Optional.empty();
+        }
+        return Optional.of(v == 1L);
     }
 
     public PersistentSubscription(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 56aeb9b4a5e..08a481b7051 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -512,9 +512,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     // ignore it for now and let the message dedup logic to 
take care of it
                 } else {
                     final String subscriptionName = 
Codec.decode(cursor.getName());
-                    subscriptions.put(subscriptionName, 
createPersistentSubscription(subscriptionName, cursor,
-                            
PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null,
-                            cursor.getCursorProperties()));
+                    Optional<Boolean> replicatedSubscriptionConfiguration =
+                            
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+                    Boolean replicated = 
replicatedSubscriptionConfiguration.orElse(null);
+                    subscriptions.put(subscriptionName,
+                            createPersistentSubscription(subscriptionName, 
cursor, replicated,
+                                    cursor.getCursorProperties()));
                     // subscription-cursor gets activated by default: 
deactivate as there is no active subscription
                     // right now
                     subscriptions.get(subscriptionName).deactivateCursor();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 9a46a2919d1..360be2e435a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doCallRealMethod;
@@ -30,11 +31,13 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.Position;
@@ -226,6 +229,40 @@ public class PersistentSubscriptionTest {
         assertTrue(persistentSubscription.cursor.getLastActive() > 
beforeAcknowledgeTimestamp);
     }
 
+    @Test
+    public void testGetReplicatedSubscriptionConfiguration() {
+        Map<String, Long> properties = 
PersistentSubscription.getBaseCursorProperties(true);
+        
assertThat(properties).containsEntry(PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY,
 1L);
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        doReturn(properties).when(cursor).getProperties();
+        Optional<Boolean> replicatedSubscriptionConfiguration =
+                
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+        
assertThat(replicatedSubscriptionConfiguration).isNotEmpty().get().isEqualTo(Boolean.TRUE);
+
+        properties = 
Map.of(PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY, 10L);
+        doReturn(properties).when(cursor).getProperties();
+        replicatedSubscriptionConfiguration =
+                
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+        assertThat(replicatedSubscriptionConfiguration).isEmpty();
+        properties = 
Map.of(PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY, -1L);
+        doReturn(properties).when(cursor).getProperties();
+        replicatedSubscriptionConfiguration =
+                
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+        assertThat(replicatedSubscriptionConfiguration).isEmpty();
+
+        properties = PersistentSubscription.getBaseCursorProperties(false);
+        doReturn(properties).when(cursor).getProperties();
+        replicatedSubscriptionConfiguration =
+                
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+        assertThat(replicatedSubscriptionConfiguration).isEmpty();
+
+        properties = PersistentSubscription.getBaseCursorProperties(null);
+        doReturn(properties).when(cursor).getProperties();
+        replicatedSubscriptionConfiguration =
+                
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+        assertThat(replicatedSubscriptionConfiguration).isEmpty();
+    }
+
     public static class CustomTransactionPendingAckStoreProvider implements 
TransactionPendingAckStoreProvider {
         @Override
         public CompletableFuture<PendingAckStore> 
newPendingAckStore(PersistentSubscription subscription) {

Reply via email to