This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95b7040d584 [enh][broker] Support properties on NonDurable 
subscriptions (PIP-105) (#15345)
95b7040d584 is described below

commit 95b7040d58492b385fb9cf54c5611908424488e3
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Apr 27 15:42:47 2022 +0200

    [enh][broker] Support properties on NonDurable subscriptions (PIP-105) 
(#15345)
---
 .../broker/service/persistent/PersistentTopic.java | 15 +++-
 .../broker/admin/CreateSubscriptionTest.java       | 98 ++++++++++++++++++----
 2 files changed, 91 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bfe0b644109..7b19181fcd1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -779,7 +779,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     getDurableSubscription(subscriptionName, initialPosition, 
startMessageRollbackDurationSec,
                             replicatedSubscriptionState, 
subscriptionProperties)
                     : getNonDurableSubscription(subscriptionName, 
startMessageId, initialPosition,
-                    startMessageRollbackDurationSec, readCompacted);
+                    startMessageRollbackDurationSec, readCompacted, 
subscriptionProperties);
 
             CompletableFuture<Consumer> future = 
subscriptionFuture.thenCompose(subscription -> {
                 Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel,
@@ -925,8 +925,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     private CompletableFuture<? extends Subscription> 
getNonDurableSubscription(String subscriptionName,
             MessageId startMessageId, InitialPosition initialPosition, long 
startMessageRollbackDurationSec,
-            boolean isReadCompacted) {
-        log.info("[{}][{}] Creating non-durable subscription at msg id {}", 
topic, subscriptionName, startMessageId);
+            boolean isReadCompacted, Map<String, String> 
subscriptionProperties) {
+        log.info("[{}][{}] Creating non-durable subscription at msg id {} - 
{}",
+                topic, subscriptionName, startMessageId, 
subscriptionProperties);
 
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
         if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -964,7 +965,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     return FutureUtil.failedFuture(e);
                 }
 
-                subscription = new PersistentSubscription(this, 
subscriptionName, cursor, false);
+                subscription = new PersistentSubscription(this, 
subscriptionName, cursor, false,
+                        subscriptionProperties);
                 subscriptions.put(subscriptionName, subscription);
             } else {
                 // if subscription exists, check if it's a durable subscription
@@ -972,6 +974,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     return FutureUtil.failedFuture(
                             new NotAllowedException("Durable subscription with 
the same name already exists."));
                 }
+
+                if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
+                        && MapUtils.isNotEmpty(subscriptionProperties)) {
+                    
subscription.getSubscriptionProperties().putAll(subscriptionProperties);
+                }
             }
 
             if (startMessageRollbackDurationSec > 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 5b43419462a..e0f5e0a77a9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -58,6 +59,7 @@ import org.awaitility.Awaitility;
 import org.eclipse.jetty.http.HttpStatus;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
@@ -156,15 +158,20 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         }
     }
 
-    @Test
-    public void testSubscriptionPropertiesStats() throws Exception {
+    @DataProvider(name = "subscriptionMode")
+    public Object[][] subscriptionModeProvider() {
+        return new Object[][] { { SubscriptionMode.Durable }, { 
SubscriptionMode.NonDurable } };
+    }
+
+    @Test(dataProvider = "subscriptionMode")
+    public void testSubscriptionPropertiesStats(SubscriptionMode 
subscriptionMode) throws Exception {
         // test non-partitioned topic
         final String topic = "persistent://my-property/my-ns/topic" + 
UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
         Map<String, String> map = new HashMap<>();
         map.put("test-topic", "tag1");
         String subName = "my-sub";
-        pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        
pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(topic).receiverQueueSize(1)
                 
.subscriptionProperties(map).subscriptionName(subName).subscribe();
         TopicStats stats = admin.topics().getStats(topic);
         Map<String, String> subProperties = 
stats.getSubscriptions().get(subName).getSubscriptionProperties();
@@ -178,7 +185,7 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         pMap.put("topic2", "tag2");
         pMap.put("topic3", "tag3");
         String pSubName = "my-sub-1";
-        pulsarClient.newConsumer().topic(partitionedTopic).receiverQueueSize(1)
+        
pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(partitionedTopic).receiverQueueSize(1)
                 
.subscriptionProperties(pMap).subscriptionName(pSubName).subscribe();
 
         PartitionedTopicStats pStats = 
admin.topics().getPartitionedStats(partitionedTopic, false);
@@ -192,16 +199,17 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         assertEquals(pSubPropForPerPartition, pMap);
     }
 
-    @Test
-    public void addSubscriptionPropertiesTest() throws Exception {
+    @Test(dataProvider = "subscriptionMode")
+    public void addSubscriptionPropertiesTest(SubscriptionMode 
subscriptionMode) throws Exception {
         String topic = "persistent://my-property/my-ns/topic" + 
UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
         Map<String, String> map = new HashMap<>();
         map.put("1", "1");
         map.put("2", "2");
         String subName = "my-sub";
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
-                
.subscriptionProperties(map).subscriptionName(subName).subscribe();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .subscriptionMode(subscriptionMode)
+                
.topic(topic).receiverQueueSize(1).subscriptionProperties(map).subscriptionName(subName).subscribe();
         PersistentSubscription subscription = (PersistentSubscription) 
pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         Map<String, String> properties = 
subscription.getSubscriptionProperties();
@@ -234,7 +242,8 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         // restart broker, consumer use old properties
         restartBroker();
 
-        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic)
+                .subscriptionMode(subscriptionMode).receiverQueueSize(1)
                 
.subscriptionProperties(map).subscriptionName(subName).subscribe();
         PersistentSubscription subscription2 = (PersistentSubscription) 
pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
@@ -251,7 +260,8 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         Map<String, String> map3 = new HashMap<>();
         map3.put("3", "3");
         map3.put("4", "4");
-        Consumer<byte[]> consumer3 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        Consumer<byte[]> consumer3 = 
pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+                .receiverQueueSize(1)
                 
.subscriptionProperties(map3).subscriptionName(subName).subscribe();
         Map<String, String> properties3 = 
subscription.getSubscriptionProperties();
         assertTrue(properties3.containsKey("1"));
@@ -262,7 +272,8 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
 
         //restart and create a new consumer with new properties, the new 
properties should be updated
         restartBroker();
-        Consumer<byte[]> consumer4 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        Consumer<byte[]> consumer4 = 
pulsarClient.newConsumer().subscriptionMode(subscriptionMode)
+                .topic(topic).receiverQueueSize(1)
                 
.subscriptionProperties(map3).subscriptionName(subName).subscribe();
         PersistentSubscription subscription4 = (PersistentSubscription) 
pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
@@ -273,21 +284,29 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         assertEquals(properties4.get("4"), "4");
         consumer4.close();
 
-        //consumer subscribe without subscriptionProperties set, it will get 
the old one
-        consumer4 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+
+        //consumer subscribe without subscriptionProperties set, it will get 
the old one for a Durable Subscription
+        // it will use the new (empty) set for a NonDurable Subscription
+        // so for Non-Durable subscriptions you have always to re-connect with 
the same properties
+        consumer4 = 
pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode).receiverQueueSize(1)
                 .subscriptionName(subName).subscribe();
         subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
         properties4 = subscription4.getSubscriptionProperties();
-        assertTrue(properties4.containsKey("3"));
-        assertTrue(properties4.containsKey("4"));
-        assertEquals(properties4.get("3"), "3");
-        assertEquals(properties4.get("4"), "4");
+        if (subscriptionMode == SubscriptionMode.Durable) {
+            assertTrue(properties4.containsKey("3"));
+            assertTrue(properties4.containsKey("4"));
+            assertEquals(properties4.get("3"), "3");
+            assertEquals(properties4.get("4"), "4");
+        } else {
+            assertTrue(properties4.isEmpty());
+        }
         consumer4.close();
 
         //restart broker, it won't get any properties
         restartBroker();
-        consumer4 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+        consumer4 = 
pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+                .receiverQueueSize(1)
                 .subscriptionName(subName).subscribe();
         subscription4 = (PersistentSubscription) pulsar.getBrokerService()
                 .getTopicReference(topic).get().getSubscription(subName);
@@ -297,6 +316,7 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         //restart broker and create a new consumer with new properties, the 
properties will be updated
         restartBroker();
         consumer4 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
                 .subscriptionProperties(map)
                 .subscriptionName(subName).subscribe();
         PersistentSubscription subscription5 = (PersistentSubscription) 
pulsar.getBrokerService()
@@ -309,6 +329,48 @@ public class CreateSubscriptionTest extends 
ProducerConsumerBase {
         consumer4.close();
 
 
+        String subNameShared = "my-sub-shared";
+        Map<String, String> mapShared = new HashMap<>();
+        // open two consumers with a Shared Subscription
+        Consumer consumerShared1 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionProperties(mapShared)
+                .subscriptionName(subNameShared).subscribe();
+        PersistentSubscription subscriptionShared = (PersistentSubscription) 
pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subNameShared);
+        properties = subscriptionShared.getSubscriptionProperties();
+        assertEquals(properties, mapShared);
+
+        // add a new consumer, the properties are updated because they were 
empty
+        mapShared = new HashMap<>();
+        mapShared.put("6", "7");
+        mapShared.put("8", "9");
+        Consumer consumerShared2 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionProperties(mapShared)
+                .subscriptionName(subNameShared).subscribe();
+
+        properties = subscriptionShared.getSubscriptionProperties();
+        assertEquals(properties, mapShared);
+
+        // add a third consumer, the properties are NOT updated because they 
are not empty
+        Map<String, String> mapShared2 = new HashMap<>();
+        mapShared2.put("10", "11");
+        Consumer consumerShared3 = 
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+                .subscriptionMode(subscriptionMode)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionProperties(mapShared2)
+                .subscriptionName(subNameShared).subscribe();
+
+        properties = subscriptionShared.getSubscriptionProperties();
+        // verify that the properties are not updated
+        assertEquals(properties, mapShared);
+
+        consumerShared1.close();
+        consumerShared2.close();
+        consumerShared3.close();
     }
 
     @Test

Reply via email to