This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95b7040d584 [enh][broker] Support properties on NonDurable
subscriptions (PIP-105) (#15345)
95b7040d584 is described below
commit 95b7040d58492b385fb9cf54c5611908424488e3
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Apr 27 15:42:47 2022 +0200
[enh][broker] Support properties on NonDurable subscriptions (PIP-105)
(#15345)
---
.../broker/service/persistent/PersistentTopic.java | 15 +++-
.../broker/admin/CreateSubscriptionTest.java | 98 ++++++++++++++++++----
2 files changed, 91 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bfe0b644109..7b19181fcd1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -779,7 +779,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
getDurableSubscription(subscriptionName, initialPosition,
startMessageRollbackDurationSec,
replicatedSubscriptionState,
subscriptionProperties)
: getNonDurableSubscription(subscriptionName,
startMessageId, initialPosition,
- startMessageRollbackDurationSec, readCompacted);
+ startMessageRollbackDurationSec, readCompacted,
subscriptionProperties);
CompletableFuture<Consumer> future =
subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic,
consumerId, priorityLevel,
@@ -925,8 +925,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private CompletableFuture<? extends Subscription>
getNonDurableSubscription(String subscriptionName,
MessageId startMessageId, InitialPosition initialPosition, long
startMessageRollbackDurationSec,
- boolean isReadCompacted) {
- log.info("[{}][{}] Creating non-durable subscription at msg id {}",
topic, subscriptionName, startMessageId);
+ boolean isReadCompacted, Map<String, String>
subscriptionProperties) {
+ log.info("[{}][{}] Creating non-durable subscription at msg id {} -
{}",
+ topic, subscriptionName, startMessageId,
subscriptionProperties);
CompletableFuture<Subscription> subscriptionFuture = new
CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -964,7 +965,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return FutureUtil.failedFuture(e);
}
- subscription = new PersistentSubscription(this,
subscriptionName, cursor, false);
+ subscription = new PersistentSubscription(this,
subscriptionName, cursor, false,
+ subscriptionProperties);
subscriptions.put(subscriptionName, subscription);
} else {
// if subscription exists, check if it's a durable subscription
@@ -972,6 +974,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return FutureUtil.failedFuture(
new NotAllowedException("Durable subscription with
the same name already exists."));
}
+
+ if (MapUtils.isEmpty(subscription.getSubscriptionProperties())
+ && MapUtils.isNotEmpty(subscriptionProperties)) {
+
subscription.getSubscriptionProperties().putAll(subscriptionProperties);
+ }
}
if (startMessageRollbackDurationSec > 0) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 5b43419462a..e0f5e0a77a9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -58,6 +59,7 @@ import org.awaitility.Awaitility;
import org.eclipse.jetty.http.HttpStatus;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
@@ -156,15 +158,20 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
}
}
- @Test
- public void testSubscriptionPropertiesStats() throws Exception {
+ @DataProvider(name = "subscriptionMode")
+ public Object[][] subscriptionModeProvider() {
+ return new Object[][] { { SubscriptionMode.Durable }, {
SubscriptionMode.NonDurable } };
+ }
+
+ @Test(dataProvider = "subscriptionMode")
+ public void testSubscriptionPropertiesStats(SubscriptionMode
subscriptionMode) throws Exception {
// test non-partitioned topic
final String topic = "persistent://my-property/my-ns/topic" +
UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
Map<String, String> map = new HashMap<>();
map.put("test-topic", "tag1");
String subName = "my-sub";
- pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+
pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(topic).receiverQueueSize(1)
.subscriptionProperties(map).subscriptionName(subName).subscribe();
TopicStats stats = admin.topics().getStats(topic);
Map<String, String> subProperties =
stats.getSubscriptions().get(subName).getSubscriptionProperties();
@@ -178,7 +185,7 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
pMap.put("topic2", "tag2");
pMap.put("topic3", "tag3");
String pSubName = "my-sub-1";
- pulsarClient.newConsumer().topic(partitionedTopic).receiverQueueSize(1)
+
pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(partitionedTopic).receiverQueueSize(1)
.subscriptionProperties(pMap).subscriptionName(pSubName).subscribe();
PartitionedTopicStats pStats =
admin.topics().getPartitionedStats(partitionedTopic, false);
@@ -192,16 +199,17 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
assertEquals(pSubPropForPerPartition, pMap);
}
- @Test
- public void addSubscriptionPropertiesTest() throws Exception {
+ @Test(dataProvider = "subscriptionMode")
+ public void addSubscriptionPropertiesTest(SubscriptionMode
subscriptionMode) throws Exception {
String topic = "persistent://my-property/my-ns/topic" +
UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
Map<String, String> map = new HashMap<>();
map.put("1", "1");
map.put("2", "2");
String subName = "my-sub";
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
-
.subscriptionProperties(map).subscriptionName(subName).subscribe();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .subscriptionMode(subscriptionMode)
+
.topic(topic).receiverQueueSize(1).subscriptionProperties(map).subscriptionName(subName).subscribe();
PersistentSubscription subscription = (PersistentSubscription)
pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Map<String, String> properties =
subscription.getSubscriptionProperties();
@@ -234,7 +242,8 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
// restart broker, consumer use old properties
restartBroker();
- Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic)
+ .subscriptionMode(subscriptionMode).receiverQueueSize(1)
.subscriptionProperties(map).subscriptionName(subName).subscribe();
PersistentSubscription subscription2 = (PersistentSubscription)
pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
@@ -251,7 +260,8 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
Map<String, String> map3 = new HashMap<>();
map3.put("3", "3");
map3.put("4", "4");
- Consumer<byte[]> consumer3 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ Consumer<byte[]> consumer3 =
pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+ .receiverQueueSize(1)
.subscriptionProperties(map3).subscriptionName(subName).subscribe();
Map<String, String> properties3 =
subscription.getSubscriptionProperties();
assertTrue(properties3.containsKey("1"));
@@ -262,7 +272,8 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
//restart and create a new consumer with new properties, the new
properties should be updated
restartBroker();
- Consumer<byte[]> consumer4 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ Consumer<byte[]> consumer4 =
pulsarClient.newConsumer().subscriptionMode(subscriptionMode)
+ .topic(topic).receiverQueueSize(1)
.subscriptionProperties(map3).subscriptionName(subName).subscribe();
PersistentSubscription subscription4 = (PersistentSubscription)
pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
@@ -273,21 +284,29 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
assertEquals(properties4.get("4"), "4");
consumer4.close();
- //consumer subscribe without subscriptionProperties set, it will get
the old one
- consumer4 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+
+ //consumer subscribe without subscriptionProperties set, it will get
the old one for a Durable Subscription
+ // it will use the new (empty) set for a NonDurable Subscription
+ // so for Non-Durable subscriptions you have always to re-connect with
the same properties
+ consumer4 =
pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode).receiverQueueSize(1)
.subscriptionName(subName).subscribe();
subscription4 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
properties4 = subscription4.getSubscriptionProperties();
- assertTrue(properties4.containsKey("3"));
- assertTrue(properties4.containsKey("4"));
- assertEquals(properties4.get("3"), "3");
- assertEquals(properties4.get("4"), "4");
+ if (subscriptionMode == SubscriptionMode.Durable) {
+ assertTrue(properties4.containsKey("3"));
+ assertTrue(properties4.containsKey("4"));
+ assertEquals(properties4.get("3"), "3");
+ assertEquals(properties4.get("4"), "4");
+ } else {
+ assertTrue(properties4.isEmpty());
+ }
consumer4.close();
//restart broker, it won't get any properties
restartBroker();
- consumer4 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ consumer4 =
pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode)
+ .receiverQueueSize(1)
.subscriptionName(subName).subscribe();
subscription4 = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
@@ -297,6 +316,7 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
//restart broker and create a new consumer with new properties, the
properties will be updated
restartBroker();
consumer4 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
.subscriptionProperties(map)
.subscriptionName(subName).subscribe();
PersistentSubscription subscription5 = (PersistentSubscription)
pulsar.getBrokerService()
@@ -309,6 +329,48 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
consumer4.close();
+ String subNameShared = "my-sub-shared";
+ Map<String, String> mapShared = new HashMap<>();
+ // open two consumers with a Shared Subscription
+ Consumer consumerShared1 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionProperties(mapShared)
+ .subscriptionName(subNameShared).subscribe();
+ PersistentSubscription subscriptionShared = (PersistentSubscription)
pulsar.getBrokerService()
+ .getTopicReference(topic).get().getSubscription(subNameShared);
+ properties = subscriptionShared.getSubscriptionProperties();
+ assertEquals(properties, mapShared);
+
+ // add a new consumer, the properties are updated because they were
empty
+ mapShared = new HashMap<>();
+ mapShared.put("6", "7");
+ mapShared.put("8", "9");
+ Consumer consumerShared2 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionProperties(mapShared)
+ .subscriptionName(subNameShared).subscribe();
+
+ properties = subscriptionShared.getSubscriptionProperties();
+ assertEquals(properties, mapShared);
+
+ // add a third consumer, the properties are NOT updated because they
are not empty
+ Map<String, String> mapShared2 = new HashMap<>();
+ mapShared2.put("10", "11");
+ Consumer consumerShared3 =
pulsarClient.newConsumer().topic(topic).receiverQueueSize(1)
+ .subscriptionMode(subscriptionMode)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionProperties(mapShared2)
+ .subscriptionName(subNameShared).subscribe();
+
+ properties = subscriptionShared.getSubscriptionProperties();
+ // verify that the properties are not updated
+ assertEquals(properties, mapShared);
+
+ consumerShared1.close();
+ consumerShared2.close();
+ consumerShared3.close();
}
@Test