This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e956db729f5 [improve] Make subscriptions on NonPersistentTopic
non-durable (#19741)
e956db729f5 is described below
commit e956db729f5098ff319237fd4220dbfb234c1b18
Author: 道君 <[email protected]>
AuthorDate: Mon May 8 22:41:37 2023 +0800
[improve] Make subscriptions on NonPersistentTopic non-durable (#19741)
---
.../nonpersistent/NonPersistentSubscription.java | 24 +---
.../service/nonpersistent/NonPersistentTopic.java | 44 ++-----
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +-
.../pulsar/broker/admin/v1/V1_AdminApi2Test.java | 2 +-
.../service/ManagedLedgerCompressionTest.java | 2 +-
.../broker/service/NonPersistentTopicE2ETest.java | 93 ++++++++++++---
.../nonpersistent/NonPersistentTopicTest.java | 132 +++++++++++++++++++++
.../client/api/NonDurableSubscriptionTest.java | 43 -------
.../org/apache/pulsar/client/impl/ReaderTest.java | 2 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 7 ++
.../client/impl/MultiTopicsConsumerImpl.java | 1 +
11 files changed, 231 insertions(+), 121 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index fc6df4a3409..1048864ad64 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
@@ -65,25 +66,17 @@ public class NonPersistentSubscription extends
AbstractSubscription implements S
@SuppressWarnings("unused")
private volatile int isFenced = FALSE;
- // Timestamp of when this subscription was last seen active
- private volatile long lastActive;
-
private volatile Map<String, String> subscriptionProperties;
- // If isDurable is false(such as a Reader), remove subscription from topic
when closing this subscription.
- private final boolean isDurable;
-
private KeySharedMode keySharedMode = null;
- public NonPersistentSubscription(NonPersistentTopic topic, String
subscriptionName, boolean isDurable,
+ public NonPersistentSubscription(NonPersistentTopic topic, String
subscriptionName,
Map<String, String> properties) {
this.topic = topic;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic",
topicName).add("name", subName).toString();
IS_FENCED_UPDATER.set(this, FALSE);
- this.lastActive = System.currentTimeMillis();
- this.isDurable = isDurable;
this.subscriptionProperties = properties != null
? Collections.unmodifiableMap(properties) :
Collections.emptyMap();
}
@@ -110,7 +103,6 @@ public class NonPersistentSubscription extends
AbstractSubscription implements S
@Override
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer)
{
- updateLastActive();
if (IS_FENCED_UPDATER.get(this) == TRUE) {
log.warn("Attempting to add consumer {} on a fenced subscription",
consumer);
return FutureUtil.failedFuture(new
SubscriptionFencedException("Subscription is fenced"));
@@ -177,7 +169,6 @@ public class NonPersistentSubscription extends
AbstractSubscription implements S
@Override
public synchronized void removeConsumer(Consumer consumer, boolean
isResetCursor) throws BrokerServiceException {
- updateLastActive();
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
@@ -185,7 +176,8 @@ public class NonPersistentSubscription extends
AbstractSubscription implements S
ConsumerStatsImpl stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);
- if (!isDurable) {
+ // Unsubscribe when all the consumers disconnected.
+ if (dispatcher != null &&
CollectionUtils.isEmpty(dispatcher.getConsumers())) {
topic.unsubscribe(subName);
}
@@ -524,14 +516,6 @@ public class NonPersistentSubscription extends
AbstractSubscription implements S
private static final Logger log =
LoggerFactory.getLogger(NonPersistentSubscription.class);
- public long getLastActive() {
- return lastActive;
- }
-
- public void updateLastActive() {
- this.lastActive = System.currentTimeMillis();
- }
-
public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 33258b06726..9fe0a735c90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -90,7 +90,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
@@ -259,8 +258,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
return internalSubscribe(option.getCnx(),
option.getSubscriptionName(), option.getConsumerId(),
option.getSubType(), option.getPriorityLevel(),
option.getConsumerName(),
- option.isDurable(), option.getStartMessageId(),
option.getMetadata(),
- option.isReadCompacted(),
+ option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
@@ -275,15 +273,14 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
long
resetStartMessageBackInSec, boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType,
priorityLevel, consumerName,
- isDurable, startMessageId, metadata, readCompacted,
resetStartMessageBackInSec,
+ startMessageId, metadata, readCompacted,
resetStartMessageBackInSec,
replicateSubscriptionState, keySharedMeta, null, null);
}
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx
cnx, String subscriptionName,
long consumerId,
SubType subType, int priorityLevel,
- String consumerName,
boolean isDurable,
- MessageId
startMessageId, Map<String, String> metadata,
- boolean
readCompacted,
+ String consumerName,
MessageId startMessageId,
+ Map<String, String>
metadata, boolean readCompacted,
long
resetStartMessageBackInSec,
boolean
replicateSubscriptionState,
KeySharedMeta
keySharedMeta,
@@ -327,7 +324,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
}
NonPersistentSubscription subscription =
subscriptions.computeIfAbsent(subscriptionName,
- name -> new NonPersistentSubscription(this,
subscriptionName, isDurable, subscriptionProperties));
+ name -> new NonPersistentSubscription(this,
subscriptionName, subscriptionProperties));
Consumer consumer = new Consumer(subscription, subType, topic,
consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted,
keySharedMeta, MessageId.latest,
@@ -383,7 +380,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
@Override
public CompletableFuture<Subscription> createSubscription(String
subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState, Map<String, String>
properties) {
- return CompletableFuture.completedFuture(new
NonPersistentSubscription(this, subscriptionName, true,
+ return CompletableFuture.completedFuture(new
NonPersistentSubscription(this, subscriptionName,
properties));
}
@@ -1029,33 +1026,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
@Override
public void checkInactiveSubscriptions() {
- TopicName name = TopicName.get(topic);
- try {
- Policies policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources()
- .getPolicies(name.getNamespaceObject())
-
.orElseThrow(MetadataStoreException.NotFoundException::new);
- final int defaultExpirationTime =
brokerService.pulsar().getConfiguration()
- .getSubscriptionExpirationTimeMinutes();
- final Integer nsExpirationTime =
policies.subscription_expiration_time_minutes;
- final long expirationTimeMillis = TimeUnit.MINUTES
- .toMillis(nsExpirationTime == null ? defaultExpirationTime
: nsExpirationTime);
- if (expirationTimeMillis > 0) {
- subscriptions.forEach((subName, sub) -> {
- if (sub.getDispatcher() != null
- && sub.getDispatcher().isConsumerConnected() ||
sub.isReplicated()) {
- return;
- }
- if (System.currentTimeMillis() - sub.getLastActive() >
expirationTimeMillis) {
- sub.delete().thenAccept(v -> log.info("[{}][{}] The
subscription was deleted due to expiration "
- + "with last active [{}]", topic, subName,
sub.getLastActive()));
- }
- });
- }
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Error getting policies", topic);
- }
- }
+ // no-op
+ // subscriptions will be removed after all the consumers disconnected.
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index fb4f880efff..7ed5fe34ea4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -431,7 +431,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
consumer.close();
topicStats = (NonPersistentTopicStats)
admin.topics().getStats(nonPersistentTopicName);
- assertTrue(topicStats.getSubscriptions().containsKey("my-sub"));
+ assertFalse(topicStats.getSubscriptions().containsKey("my-sub"));
assertEquals(topicStats.getPublishers().size(), 0);
// test partitioned-topic
final String partitionedTopicName =
"non-persistent://prop-xyz/ns1/paritioned";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
index 1aed0be31b1..cd08977a09b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
@@ -251,7 +251,7 @@ public class V1_AdminApi2Test extends
MockedPulsarServiceBaseTest {
client.close();
topicStats = admin.nonPersistentTopics().getStats(persistentTopicName);
- assertTrue(topicStats.getSubscriptions().keySet().contains("my-sub"));
+ assertFalse(topicStats.getSubscriptions().keySet().contains("my-sub"));
assertEquals(topicStats.getPublishers().size(), 0);
// test partitioned-topic
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
index c13c8bd9fdc..1ecb0d8f5f7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
@@ -100,7 +100,7 @@ public class ManagedLedgerCompressionTest extends
BrokerTestBase {
producer.newMessage().value("test".getBytes()).send();
}
for (int i = 0; i < messageCnt; i++) {
- Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+ Message<byte[]> message = consumer.receive(1000,
TimeUnit.MILLISECONDS);
consumer.acknowledge(message);
Assert.assertNotNull(message);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 6ff5e7b2f82..99ba97dff5f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -44,6 +44,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
public class NonPersistentTopicE2ETest extends BrokerTestBase {
@@ -125,8 +126,8 @@ public class NonPersistentTopicE2ETest extends
BrokerTestBase {
// 2. Topic is not GCed with live connection
final String topicName3 = "non-persistent://prop/ns-abc/topic-2";
- subName = "sub1";
- consumer =
pulsarClient.newConsumer().topic(topicName3).subscriptionName(subName).subscribe();
+ String subName2 = "sub1";
+ consumer =
pulsarClient.newConsumer().topic(topicName3).subscriptionName(subName2).subscribe();
topic = getTopic(topicName3);
assertTrue(topic.isPresent());
topic.get().addSchema(schemaData).join();
@@ -136,23 +137,56 @@ public class NonPersistentTopicE2ETest extends
BrokerTestBase {
assertTrue(getTopic(topicName3).isPresent());
assertTrue(topicHasSchema(topicName3));
- // 3. Topic with subscription is not GCed even with no connections
+ // 3. Topic can be GCed after unsubscribe
consumer.close();
+ assertThrows(() -> admin.topics().deleteSubscription(topicName3,
subName2));
+
+ runGC();
+ Awaitility.await().untilAsserted(() -> {
+ assertFalse(getTopic(topicName3).isPresent());
+ });
+ assertFalse(topicHasSchema(topicName3));
+ }
+
+
+ @Test
+ public void testCloseConsumerWillDeleteSchema() throws Exception {
+ // 1. Simple successful GC
+ final String topicName = "non-persistent://prop/ns-abc/topic-1";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ producer.close();
+
+ Optional<Topic> topic = getTopic(topicName);
+ assertTrue(topic.isPresent());
+
+ byte[] data = JSONSchema.of(SchemaDefinition.builder()
+ .withPojo(Foo.class).build()).getSchemaInfo().getSchema();
+ SchemaData schemaData = SchemaData.builder()
+ .data(data)
+ .type(SchemaType.BYTES)
+ .user("foo").build();
+ topic.get().addSchema(schemaData).join();
+
+ final String topicName3 = "non-persistent://prop/ns-abc/topic-2";
+ String subName = "sub1";
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName3).subscriptionName(subName).subscribe();
+ topic = getTopic(topicName3);
+ assertTrue(topic.isPresent());
+ topic.get().addSchema(schemaData).join();
+ assertTrue(topicHasSchema(topicName3));
runGC();
assertTrue(getTopic(topicName3).isPresent());
assertTrue(topicHasSchema(topicName3));
- // 4. Topic can be GCed after unsubscribe
- admin.topics().deleteSubscription(topicName3, subName);
+ // 2. Close consumer will make the topic GCed
+ consumer.close();
runGC();
- Awaitility.await().untilAsserted(() -> {
- assertFalse(getTopic(topicName3).isPresent());
- });
+ assertFalse(getTopic(topicName3).isPresent());
assertFalse(topicHasSchema(topicName3));
- }
+ }
@Test(groups = "broker")
public void testPatternTopic() throws PulsarClientException,
InterruptedException {
final String topic1 =
"non-persistent://prop/ns-abc/testPatternTopic1-" +
UUID.randomUUID().toString();
@@ -210,18 +244,14 @@ public class NonPersistentTopicE2ETest extends
BrokerTestBase {
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
- // 3. Topic with subscription is not GCed even with no connections
+ // 3. Topic can be GCed after unsubscribe
consumer.close();
-
- runGC();
-
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-
- // 4. Topic can be GCed after unsubscribe
- admin.topics().deleteSubscription(topicName, subName);
+ // subscription will be deleted after consumer#close. so it will be
failed to delete sub.
+ assertThrows(() -> admin.topics().deleteSubscription(topicName,
subName));
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
- // 5. Get the topic and make sure it doesn't come back
+ //4. Get the topic and make sure it doesn't come back
admin.lookups().lookupTopic(topicName);
Optional<Topic> topic =
pulsar.getBrokerService().getTopicIfExists(topicName).join();
assertFalse(topic.isPresent());
@@ -233,7 +263,7 @@ public class NonPersistentTopicE2ETest extends
BrokerTestBase {
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
- // 6. Test for partitioned topic to delete the partitioned metadata
+ // 5. Test for partitioned topic to delete the partitioned metadata
String topicGc = "non-persistent://prop/ns-abc/topic-gc";
int partitions = 5;
admin.topics().createPartitionedTopic(topicGc, partitions);
@@ -246,4 +276,31 @@ public class NonPersistentTopicE2ETest extends
BrokerTestBase {
assertEquals(pulsar.getBrokerService().
fetchPartitionedTopicMetadataAsync(TopicName.get(topicGc)).join().partitions,
0));
}
+
+ @Test
+ public void testCloseConsumerThenRunGC() throws Exception {
+ // 1. Simple successful GC
+ String topicName = "non-persistent://prop/ns-abc/topic-10";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ producer.close();
+
+
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+ runGC();
+ Awaitility.await().untilAsserted(() ->
+
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent())
+ );
+
+ // 2. Topic is not GCed with live connection
+ String subName = "sub1";
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+ runGC();
+
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // 3. Topic with subscription can be GCed if consumers closed
+ consumer.close();
+
+ runGC();
+
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 73a1084f30f..eb25489076b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,18 +18,27 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
import org.junit.Assert;
+import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -119,4 +128,127 @@ public class NonPersistentTopicTest extends
BrokerTestBase {
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
}
+
+
+ @Test
+ public void testSubscriptionsOnNonPersistentTopic() throws Exception {
+ final String topicName = "non-persistent://prop/ns-abc/topic_" +
UUID.randomUUID();
+ final String exclusiveSubName = "exclusive";
+ final String failoverSubName = "failover";
+ final String sharedSubName = "shared";
+ final String keySharedSubName = "key_shared";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ producer.send("This is a message");
+ NonPersistentTopic topic = (NonPersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+
+ NonPersistentTopic mockTopic = Mockito.spy(topic);
+ pulsar.getBrokerService().getTopics().put(topicName,
CompletableFuture.completedFuture(Optional.of(mockTopic)));
+ Mockito
+ .doAnswer(inv -> {
+ SubscriptionOption option = inv.getArgument(0);
+ if (option.isDurable()) {
+ return CompletableFuture.failedFuture(
+ new IllegalArgumentException("isDurable cannot
be true when subscribe " +
+ "on non-persistent topic"));
+ }
+ return inv.callRealMethod();
+ }).when(mockTopic).subscribe(Mockito.any());
+
+ @Cleanup
+ Consumer<String> exclusiveConsumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(exclusiveSubName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscribe();
+
+ @Cleanup
+ Consumer<String> failoverConsumer1 =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(failoverSubName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscribe();
+ @Cleanup
+ Consumer<String> failoverConsumer2 =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(failoverSubName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscribe();
+ @Cleanup
+ Consumer<String> sharedConsumer1 =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(sharedSubName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscribe();
+ @Cleanup
+ Consumer<String> sharedConsumer2 =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(sharedSubName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscribe();
+
+ @Cleanup
+ Consumer<String> keySharedConsumer1 =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(keySharedSubName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscribe();
+ @Cleanup
+ Consumer<String> keySharedConsumer2 =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(keySharedSubName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscribe();
+
+ ConcurrentOpenHashMap<String, NonPersistentSubscription>
subscriptionMap = mockTopic.getSubscriptions();
+ Assert.assertEquals(subscriptionMap.size(), 4);
+
+ // Check exclusive subscription
+ NonPersistentSubscription exclusiveSub =
subscriptionMap.get(exclusiveSubName);
+ Assert.assertNotNull(exclusiveSub);
+ exclusiveConsumer.close();
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
+ .until(() -> subscriptionMap.get(exclusiveSubName) == null);
+
+ // Check failover subscription
+ NonPersistentSubscription failoverSub =
subscriptionMap.get(failoverSubName);
+ Assert.assertNotNull(failoverSub);
+ failoverConsumer1.close();
+ failoverSub = subscriptionMap.get(failoverSubName);
+ Assert.assertNotNull(failoverSub);
+ failoverConsumer2.close();
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
+ .until(() -> subscriptionMap.get(failoverSubName) == null);
+
+ // Check shared subscription
+ NonPersistentSubscription sharedSub =
subscriptionMap.get(sharedSubName);
+ Assert.assertNotNull(sharedSub);
+ sharedConsumer1.close();
+ sharedSub = subscriptionMap.get(sharedSubName);
+ Assert.assertNotNull(sharedSub);
+ sharedConsumer2.close();
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
+ .until(() -> subscriptionMap.get(sharedSubName) == null);
+
+ // Check KeyShared subscription
+ NonPersistentSubscription keySharedSub =
subscriptionMap.get(keySharedSubName);
+ Assert.assertNotNull(keySharedSub);
+ keySharedConsumer1.close();
+ keySharedSub = subscriptionMap.get(keySharedSubName);
+ Assert.assertNotNull(keySharedSub);
+ keySharedConsumer2.close();
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS)
+ .until(() -> subscriptionMap.get(keySharedSubName) == null);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 52b1498ef7d..6375f79bfbb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -20,11 +20,6 @@ package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertNull;
-import static org.testng.AssertJUnit.assertTrue;
-import java.lang.reflect.Field;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
@@ -33,8 +28,6 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
-import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
-import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.testng.Assert;
@@ -185,42 +178,6 @@ public class NonDurableSubscriptionTest extends
ProducerConsumerBase {
}
}
- @Test(timeOut = 10000)
- public void testDeleteInactiveNonPersistentSubscription() throws Exception
{
- final String topic = "non-persistent://my-property/my-ns/topic-" +
UUID.randomUUID();
- final String subName = "my-subscriber";
- admin.topics().createNonPartitionedTopic(topic);
- // 1 setup consumer
- Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
- .subscriptionName(subName).subscribe();
- // 3 due to the existence of consumers, subscriptions will not be
cleaned up
- NonPersistentTopic nonPersistentTopic = (NonPersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
- NonPersistentSubscription nonPersistentSubscription =
(NonPersistentSubscription) nonPersistentTopic.getSubscription(subName);
- assertNotNull(nonPersistentSubscription);
- assertNotNull(nonPersistentSubscription.getDispatcher());
-
assertTrue(nonPersistentSubscription.getDispatcher().isConsumerConnected());
- assertFalse(nonPersistentSubscription.isReplicated());
-
- nonPersistentTopic.checkInactiveSubscriptions();
- Thread.sleep(500);
- nonPersistentSubscription = (NonPersistentSubscription)
nonPersistentTopic.getSubscription(subName);
- assertNotNull(nonPersistentSubscription);
- // remove consumer and wait for cleanup
- consumer.close();
- Thread.sleep(500);
-
- //change last active time to 5 minutes ago
- Field f =
NonPersistentSubscription.class.getDeclaredField("lastActive");
- f.setAccessible(true);
- f.set(nonPersistentTopic.getSubscription(subName),
System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
- //without consumers and last active time is 5 minutes ago,
subscription should be cleaned up
- nonPersistentTopic.checkInactiveSubscriptions();
- Thread.sleep(500);
- nonPersistentSubscription = (NonPersistentSubscription)
nonPersistentTopic.getSubscription(subName);
- assertNull(nonPersistentSubscription);
-
- }
-
@DataProvider(name = "subscriptionTypes")
public static Object[][] subscriptionTypes() {
Object[][] result = new Object[SubscriptionType.values().length][];
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 951f99af1a4..a50c92f7ab8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -600,7 +600,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest
{
.until(() -> {
TopicStats topicStats = admin.topics().getStats(topic);
System.out.println("subscriptions size: " +
topicStats.getSubscriptions().size());
- return topicStats.getSubscriptions().size() == 1;
+ return topicStats.getSubscriptions().size() == 0;
});
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6c64fe04069..2693f12d3ea 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -261,6 +261,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
super(client, topic, conf, conf.getReceiverQueueSize(),
executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
+
+ TopicName topicName = TopicName.get(topic);
+ if (!topicName.isPersistent() &&
conf.getSubscriptionMode().equals(SubscriptionMode.Durable)) {
+ conf.setSubscriptionMode(SubscriptionMode.NonDurable);
+ log.warn("[{}] Cannot create a [Durable] subscription for a
NonPersistentTopic, "
+ + "will use [NonDurable] to subscribe. Subscription name:
{}", topic, conf.getSubscriptionName());
+ }
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
MessageIdAdv firstChunkMessageId = ((MessageIdAdv)
startMessageId).getFirstChunkMessageId();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ef0345de919..d0607b97c18 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1121,6 +1121,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
.timeout(1, TimeUnit.MILLISECONDS)
.build();
configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+ configurationData = configurationData.clone();
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, listener != null, subFuture,