This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e956db729f5 [improve] Make subscriptions on NonPersistentTopic 
non-durable (#19741)
e956db729f5 is described below

commit e956db729f5098ff319237fd4220dbfb234c1b18
Author: 道君 <[email protected]>
AuthorDate: Mon May 8 22:41:37 2023 +0800

    [improve] Make subscriptions on NonPersistentTopic non-durable (#19741)
---
 .../nonpersistent/NonPersistentSubscription.java   |  24 +---
 .../service/nonpersistent/NonPersistentTopic.java  |  44 ++-----
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |   2 +-
 .../pulsar/broker/admin/v1/V1_AdminApi2Test.java   |   2 +-
 .../service/ManagedLedgerCompressionTest.java      |   2 +-
 .../broker/service/NonPersistentTopicE2ETest.java  |  93 ++++++++++++---
 .../nonpersistent/NonPersistentTopicTest.java      | 132 +++++++++++++++++++++
 .../client/api/NonDurableSubscriptionTest.java     |  43 -------
 .../org/apache/pulsar/client/impl/ReaderTest.java  |   2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   7 ++
 .../client/impl/MultiTopicsConsumerImpl.java       |   1 +
 11 files changed, 231 insertions(+), 121 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index fc6df4a3409..1048864ad64 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.AbstractSubscription;
 import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
@@ -65,25 +66,17 @@ public class NonPersistentSubscription extends 
AbstractSubscription implements S
     @SuppressWarnings("unused")
     private volatile int isFenced = FALSE;
 
-    // Timestamp of when this subscription was last seen active
-    private volatile long lastActive;
-
     private volatile Map<String, String> subscriptionProperties;
 
-    // If isDurable is false(such as a Reader), remove subscription from topic 
when closing this subscription.
-    private final boolean isDurable;
-
     private KeySharedMode keySharedMode = null;
 
-    public NonPersistentSubscription(NonPersistentTopic topic, String 
subscriptionName, boolean isDurable,
+    public NonPersistentSubscription(NonPersistentTopic topic, String 
subscriptionName,
                                      Map<String, String> properties) {
         this.topic = topic;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.fullName = MoreObjects.toStringHelper(this).add("topic", 
topicName).add("name", subName).toString();
         IS_FENCED_UPDATER.set(this, FALSE);
-        this.lastActive = System.currentTimeMillis();
-        this.isDurable = isDurable;
         this.subscriptionProperties = properties != null
                 ? Collections.unmodifiableMap(properties) : 
Collections.emptyMap();
     }
@@ -110,7 +103,6 @@ public class NonPersistentSubscription extends 
AbstractSubscription implements S
 
     @Override
     public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) 
{
-        updateLastActive();
         if (IS_FENCED_UPDATER.get(this) == TRUE) {
             log.warn("Attempting to add consumer {} on a fenced subscription", 
consumer);
             return FutureUtil.failedFuture(new 
SubscriptionFencedException("Subscription is fenced"));
@@ -177,7 +169,6 @@ public class NonPersistentSubscription extends 
AbstractSubscription implements S
 
     @Override
     public synchronized void removeConsumer(Consumer consumer, boolean 
isResetCursor) throws BrokerServiceException {
-        updateLastActive();
         if (dispatcher != null) {
             dispatcher.removeConsumer(consumer);
         }
@@ -185,7 +176,8 @@ public class NonPersistentSubscription extends 
AbstractSubscription implements S
         ConsumerStatsImpl stats = consumer.getStats();
         bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
         msgOutFromRemovedConsumer.add(stats.msgOutCounter);
-        if (!isDurable) {
+        // Unsubscribe when all the consumers disconnected.
+        if (dispatcher != null && 
CollectionUtils.isEmpty(dispatcher.getConsumers())) {
             topic.unsubscribe(subName);
         }
 
@@ -524,14 +516,6 @@ public class NonPersistentSubscription extends 
AbstractSubscription implements S
 
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentSubscription.class);
 
-    public long getLastActive() {
-        return lastActive;
-    }
-
-    public void updateLastActive() {
-        this.lastActive = System.currentTimeMillis();
-    }
-
     public Map<String, String> getSubscriptionProperties() {
         return subscriptionProperties;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 33258b06726..9fe0a735c90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -90,7 +90,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 import org.slf4j.Logger;
@@ -259,8 +258,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
         return internalSubscribe(option.getCnx(), 
option.getSubscriptionName(), option.getConsumerId(),
                 option.getSubType(), option.getPriorityLevel(), 
option.getConsumerName(),
-                option.isDurable(), option.getStartMessageId(), 
option.getMetadata(),
-                option.isReadCompacted(),
+                option.getStartMessageId(), option.getMetadata(), 
option.isReadCompacted(),
                 option.getStartMessageRollbackDurationSec(), 
option.isReplicatedSubscriptionStateArg(),
                 option.getKeySharedMeta(), 
option.getSubscriptionProperties().orElse(null),
                 option.getSchemaType());
@@ -275,15 +273,14 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                                                  long 
resetStartMessageBackInSec, boolean replicateSubscriptionState,
                                                  KeySharedMeta keySharedMeta) {
         return internalSubscribe(cnx, subscriptionName, consumerId, subType, 
priorityLevel, consumerName,
-                isDurable, startMessageId, metadata, readCompacted, 
resetStartMessageBackInSec,
+                startMessageId, metadata, readCompacted, 
resetStartMessageBackInSec,
                 replicateSubscriptionState, keySharedMeta, null, null);
     }
 
     private CompletableFuture<Consumer> internalSubscribe(final TransportCnx 
cnx, String subscriptionName,
                                                           long consumerId, 
SubType subType, int priorityLevel,
-                                                          String consumerName, 
boolean isDurable,
-                                                          MessageId 
startMessageId, Map<String, String> metadata,
-                                                          boolean 
readCompacted,
+                                                          String consumerName, 
MessageId startMessageId,
+                                                          Map<String, String> 
metadata, boolean readCompacted,
                                                           long 
resetStartMessageBackInSec,
                                                           boolean 
replicateSubscriptionState,
                                                           KeySharedMeta 
keySharedMeta,
@@ -327,7 +324,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             }
 
             NonPersistentSubscription subscription = 
subscriptions.computeIfAbsent(subscriptionName,
-                    name -> new NonPersistentSubscription(this, 
subscriptionName, isDurable, subscriptionProperties));
+                    name -> new NonPersistentSubscription(this, 
subscriptionName, subscriptionProperties));
 
             Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel, consumerName,
                     false, cnx, cnx.getAuthRole(), metadata, readCompacted, 
keySharedMeta, MessageId.latest,
@@ -383,7 +380,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     @Override
     public CompletableFuture<Subscription> createSubscription(String 
subscriptionName, InitialPosition initialPosition,
             boolean replicateSubscriptionState, Map<String, String> 
properties) {
-        return CompletableFuture.completedFuture(new 
NonPersistentSubscription(this, subscriptionName, true,
+        return CompletableFuture.completedFuture(new 
NonPersistentSubscription(this, subscriptionName,
                 properties));
     }
 
@@ -1029,33 +1026,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
     @Override
     public void checkInactiveSubscriptions() {
-        TopicName name = TopicName.get(topic);
-        try {
-            Policies policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                    .getPolicies(name.getNamespaceObject())
-                    
.orElseThrow(MetadataStoreException.NotFoundException::new);
-            final int defaultExpirationTime = 
brokerService.pulsar().getConfiguration()
-                    .getSubscriptionExpirationTimeMinutes();
-            final Integer nsExpirationTime = 
policies.subscription_expiration_time_minutes;
-            final long expirationTimeMillis = TimeUnit.MINUTES
-                    .toMillis(nsExpirationTime == null ? defaultExpirationTime 
: nsExpirationTime);
-            if (expirationTimeMillis > 0) {
-                subscriptions.forEach((subName, sub) -> {
-                    if (sub.getDispatcher() != null
-                            && sub.getDispatcher().isConsumerConnected() || 
sub.isReplicated()) {
-                        return;
-                    }
-                    if (System.currentTimeMillis() - sub.getLastActive() > 
expirationTimeMillis) {
-                        sub.delete().thenAccept(v -> log.info("[{}][{}] The 
subscription was deleted due to expiration "
-                                + "with last active [{}]", topic, subName, 
sub.getLastActive()));
-                    }
-                });
-            }
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Error getting policies", topic);
-            }
-        }
+        // no-op
+        // subscriptions will be removed after all the consumers disconnected.
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index fb4f880efff..7ed5fe34ea4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -431,7 +431,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 
         consumer.close();
         topicStats = (NonPersistentTopicStats) 
admin.topics().getStats(nonPersistentTopicName);
-        assertTrue(topicStats.getSubscriptions().containsKey("my-sub"));
+        assertFalse(topicStats.getSubscriptions().containsKey("my-sub"));
         assertEquals(topicStats.getPublishers().size(), 0);
         // test partitioned-topic
         final String partitionedTopicName = 
"non-persistent://prop-xyz/ns1/paritioned";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
index 1aed0be31b1..cd08977a09b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
@@ -251,7 +251,7 @@ public class V1_AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         client.close();
 
         topicStats = admin.nonPersistentTopics().getStats(persistentTopicName);
-        assertTrue(topicStats.getSubscriptions().keySet().contains("my-sub"));
+        assertFalse(topicStats.getSubscriptions().keySet().contains("my-sub"));
         assertEquals(topicStats.getPublishers().size(), 0);
 
         // test partitioned-topic
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
index c13c8bd9fdc..1ecb0d8f5f7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ManagedLedgerCompressionTest.java
@@ -100,7 +100,7 @@ public class ManagedLedgerCompressionTest extends 
BrokerTestBase {
             producer.newMessage().value("test".getBytes()).send();
         }
         for (int i = 0; i < messageCnt; i++) {
-            Message<byte[]> message = consumer.receive(1000, TimeUnit.SECONDS);
+            Message<byte[]> message = consumer.receive(1000, 
TimeUnit.MILLISECONDS);
             consumer.acknowledge(message);
             Assert.assertNotNull(message);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 6ff5e7b2f82..99ba97dff5f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -44,6 +44,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 
 public class NonPersistentTopicE2ETest extends BrokerTestBase {
@@ -125,8 +126,8 @@ public class NonPersistentTopicE2ETest extends 
BrokerTestBase {
 
         // 2. Topic is not GCed with live connection
         final String topicName3 = "non-persistent://prop/ns-abc/topic-2";
-        subName = "sub1";
-        consumer = 
pulsarClient.newConsumer().topic(topicName3).subscriptionName(subName).subscribe();
+        String subName2 = "sub1";
+        consumer = 
pulsarClient.newConsumer().topic(topicName3).subscriptionName(subName2).subscribe();
         topic = getTopic(topicName3);
         assertTrue(topic.isPresent());
         topic.get().addSchema(schemaData).join();
@@ -136,23 +137,56 @@ public class NonPersistentTopicE2ETest extends 
BrokerTestBase {
         assertTrue(getTopic(topicName3).isPresent());
         assertTrue(topicHasSchema(topicName3));
 
-        // 3. Topic with subscription is not GCed even with no connections
+        // 3. Topic can be GCed after unsubscribe
         consumer.close();
+        assertThrows(() -> admin.topics().deleteSubscription(topicName3, 
subName2));
+
+        runGC();
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(getTopic(topicName3).isPresent());
+        });
+        assertFalse(topicHasSchema(topicName3));
+    }
+
+
+    @Test
+    public void testCloseConsumerWillDeleteSchema() throws Exception {
+        // 1. Simple successful GC
+        final String topicName = "non-persistent://prop/ns-abc/topic-1";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        producer.close();
+
+        Optional<Topic> topic = getTopic(topicName);
+        assertTrue(topic.isPresent());
+
+        byte[] data = JSONSchema.of(SchemaDefinition.builder()
+                .withPojo(Foo.class).build()).getSchemaInfo().getSchema();
+        SchemaData schemaData = SchemaData.builder()
+                .data(data)
+                .type(SchemaType.BYTES)
+                .user("foo").build();
+        topic.get().addSchema(schemaData).join();
+
+        final String topicName3 = "non-persistent://prop/ns-abc/topic-2";
+        String subName = "sub1";
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName3).subscriptionName(subName).subscribe();
+        topic = getTopic(topicName3);
+        assertTrue(topic.isPresent());
+        topic.get().addSchema(schemaData).join();
+        assertTrue(topicHasSchema(topicName3));
 
         runGC();
         assertTrue(getTopic(topicName3).isPresent());
         assertTrue(topicHasSchema(topicName3));
 
-        // 4. Topic can be GCed after unsubscribe
-        admin.topics().deleteSubscription(topicName3, subName);
+        // 2. Close consumer will make the topic GCed
+        consumer.close();
 
         runGC();
-        Awaitility.await().untilAsserted(() -> {
-            assertFalse(getTopic(topicName3).isPresent());
-        });
+        assertFalse(getTopic(topicName3).isPresent());
         assertFalse(topicHasSchema(topicName3));
-    }
 
+    }
     @Test(groups = "broker")
     public void testPatternTopic() throws PulsarClientException, 
InterruptedException {
         final String topic1 = 
"non-persistent://prop/ns-abc/testPatternTopic1-" + 
UUID.randomUUID().toString();
@@ -210,18 +244,14 @@ public class NonPersistentTopicE2ETest extends 
BrokerTestBase {
         runGC();
         
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
 
-        // 3. Topic with subscription is not GCed even with no connections
+        // 3. Topic can be GCed after unsubscribe
         consumer.close();
-
-        runGC();
-        
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-
-        // 4. Topic can be GCed after unsubscribe
-        admin.topics().deleteSubscription(topicName, subName);
+        // subscription will be deleted after consumer#close. so it will be 
failed to delete sub.
+        assertThrows(() -> admin.topics().deleteSubscription(topicName, 
subName));
 
         runGC();
         
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-        // 5. Get the topic and make sure it doesn't come back
+        //4. Get the topic and make sure it doesn't come back
         admin.lookups().lookupTopic(topicName);
         Optional<Topic> topic = 
pulsar.getBrokerService().getTopicIfExists(topicName).join();
         assertFalse(topic.isPresent());
@@ -233,7 +263,7 @@ public class NonPersistentTopicE2ETest extends 
BrokerTestBase {
 
         
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
 
-        // 6. Test for partitioned topic to delete the partitioned metadata
+        // 5. Test for partitioned topic to delete the partitioned metadata
         String topicGc = "non-persistent://prop/ns-abc/topic-gc";
         int partitions = 5;
         admin.topics().createPartitionedTopic(topicGc, partitions);
@@ -246,4 +276,31 @@ public class NonPersistentTopicE2ETest extends 
BrokerTestBase {
                 assertEquals(pulsar.getBrokerService().
                         
fetchPartitionedTopicMetadataAsync(TopicName.get(topicGc)).join().partitions, 
0));
     }
+
+    @Test
+    public void testCloseConsumerThenRunGC() throws Exception {
+        // 1. Simple successful GC
+        String topicName = "non-persistent://prop/ns-abc/topic-10";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        producer.close();
+
+        
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+        runGC();
+        Awaitility.await().untilAsserted(() ->
+                
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent())
+        );
+
+        // 2. Topic is not GCed with live connection
+        String subName = "sub1";
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+        runGC();
+        
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 3. Topic with subscription can be GCed if consumers closed
+        consumer.close();
+
+        runGC();
+        
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 73a1084f30f..eb25489076b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,18 +18,27 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SubscriptionOption;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -119,4 +128,127 @@ public class NonPersistentTopicTest extends 
BrokerTestBase {
         }
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }
+
+
+    @Test
+    public void testSubscriptionsOnNonPersistentTopic() throws Exception {
+        final String topicName = "non-persistent://prop/ns-abc/topic_" + 
UUID.randomUUID();
+        final String exclusiveSubName = "exclusive";
+        final String failoverSubName = "failover";
+        final String sharedSubName = "shared";
+        final String keySharedSubName = "key_shared";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        producer.send("This is a message");
+        NonPersistentTopic topic = (NonPersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+
+        NonPersistentTopic mockTopic = Mockito.spy(topic);
+        pulsar.getBrokerService().getTopics().put(topicName, 
CompletableFuture.completedFuture(Optional.of(mockTopic)));
+        Mockito
+                .doAnswer(inv -> {
+                    SubscriptionOption option = inv.getArgument(0);
+                    if (option.isDurable()) {
+                        return CompletableFuture.failedFuture(
+                                new IllegalArgumentException("isDurable cannot 
be true when subscribe " +
+                                        "on non-persistent topic"));
+                    }
+                    return inv.callRealMethod();
+                }).when(mockTopic).subscribe(Mockito.any());
+
+        @Cleanup
+        Consumer<String> exclusiveConsumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(exclusiveSubName)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> failoverConsumer1 = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(failoverSubName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscribe();
+        @Cleanup
+        Consumer<String> failoverConsumer2 = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(failoverSubName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscribe();
+        @Cleanup
+        Consumer<String> sharedConsumer1 = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(sharedSubName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscribe();
+        @Cleanup
+        Consumer<String> sharedConsumer2 = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(sharedSubName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> keySharedConsumer1 = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(keySharedSubName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscribe();
+        @Cleanup
+        Consumer<String> keySharedConsumer2 = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(keySharedSubName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscribe();
+
+        ConcurrentOpenHashMap<String, NonPersistentSubscription> 
subscriptionMap = mockTopic.getSubscriptions();
+        Assert.assertEquals(subscriptionMap.size(), 4);
+
+        // Check exclusive subscription
+        NonPersistentSubscription exclusiveSub = 
subscriptionMap.get(exclusiveSubName);
+        Assert.assertNotNull(exclusiveSub);
+        exclusiveConsumer.close();
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+                .until(() -> subscriptionMap.get(exclusiveSubName) == null);
+
+        // Check failover subscription
+        NonPersistentSubscription failoverSub = 
subscriptionMap.get(failoverSubName);
+        Assert.assertNotNull(failoverSub);
+        failoverConsumer1.close();
+        failoverSub = subscriptionMap.get(failoverSubName);
+        Assert.assertNotNull(failoverSub);
+        failoverConsumer2.close();
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+                .until(() -> subscriptionMap.get(failoverSubName) == null);
+
+        // Check shared subscription
+        NonPersistentSubscription sharedSub = 
subscriptionMap.get(sharedSubName);
+        Assert.assertNotNull(sharedSub);
+        sharedConsumer1.close();
+        sharedSub = subscriptionMap.get(sharedSubName);
+        Assert.assertNotNull(sharedSub);
+        sharedConsumer2.close();
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+                .until(() -> subscriptionMap.get(sharedSubName) == null);
+
+        // Check KeyShared subscription
+        NonPersistentSubscription keySharedSub = 
subscriptionMap.get(keySharedSubName);
+        Assert.assertNotNull(keySharedSub);
+        keySharedConsumer1.close();
+        keySharedSub = subscriptionMap.get(keySharedSubName);
+        Assert.assertNotNull(keySharedSub);
+        keySharedConsumer2.close();
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS)
+                .until(() -> subscriptionMap.get(keySharedSubName) == null);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 52b1498ef7d..6375f79bfbb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -20,11 +20,6 @@ package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertNull;
-import static org.testng.AssertJUnit.assertTrue;
-import java.lang.reflect.Field;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
@@ -33,8 +28,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.PulsarChannelInitializer;
 import org.apache.pulsar.broker.service.ServerCnx;
-import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
-import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.api.proto.CommandFlow;
 import org.testng.Assert;
@@ -185,42 +178,6 @@ public class NonDurableSubscriptionTest  extends 
ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = 10000)
-    public void testDeleteInactiveNonPersistentSubscription() throws Exception 
{
-        final String topic = "non-persistent://my-property/my-ns/topic-" + 
UUID.randomUUID();
-        final String subName = "my-subscriber";
-        admin.topics().createNonPartitionedTopic(topic);
-        // 1 setup consumer
-        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
-                .subscriptionName(subName).subscribe();
-        // 3 due to the existence of consumers, subscriptions will not be 
cleaned up
-        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
-        NonPersistentSubscription nonPersistentSubscription = 
(NonPersistentSubscription) nonPersistentTopic.getSubscription(subName);
-        assertNotNull(nonPersistentSubscription);
-        assertNotNull(nonPersistentSubscription.getDispatcher());
-        
assertTrue(nonPersistentSubscription.getDispatcher().isConsumerConnected());
-        assertFalse(nonPersistentSubscription.isReplicated());
-
-        nonPersistentTopic.checkInactiveSubscriptions();
-        Thread.sleep(500);
-        nonPersistentSubscription = (NonPersistentSubscription) 
nonPersistentTopic.getSubscription(subName);
-        assertNotNull(nonPersistentSubscription);
-        // remove consumer and wait for cleanup
-        consumer.close();
-        Thread.sleep(500);
-
-        //change last active time to 5 minutes ago
-        Field f = 
NonPersistentSubscription.class.getDeclaredField("lastActive");
-        f.setAccessible(true);
-        f.set(nonPersistentTopic.getSubscription(subName), 
System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
-        //without consumers and last active time is 5 minutes ago, 
subscription should be cleaned up
-        nonPersistentTopic.checkInactiveSubscriptions();
-        Thread.sleep(500);
-        nonPersistentSubscription = (NonPersistentSubscription) 
nonPersistentTopic.getSubscription(subName);
-        assertNull(nonPersistentSubscription);
-
-    }
-
     @DataProvider(name = "subscriptionTypes")
     public static Object[][] subscriptionTypes() {
         Object[][] result = new Object[SubscriptionType.values().length][];
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 951f99af1a4..a50c92f7ab8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -600,7 +600,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest 
{
                 .until(() -> {
             TopicStats topicStats = admin.topics().getStats(topic);
             System.out.println("subscriptions size: " + 
topicStats.getSubscriptions().size());
-            return topicStats.getSubscriptions().size() == 1;
+            return topicStats.getSubscriptions().size() == 0;
         });
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6c64fe04069..2693f12d3ea 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -261,6 +261,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         super(client, topic, conf, conf.getReceiverQueueSize(), 
executorProvider, subscribeFuture, schema,
                 interceptors);
         this.consumerId = client.newConsumerId();
+
+        TopicName topicName = TopicName.get(topic);
+        if (!topicName.isPersistent() && 
conf.getSubscriptionMode().equals(SubscriptionMode.Durable)) {
+            conf.setSubscriptionMode(SubscriptionMode.NonDurable);
+            log.warn("[{}] Cannot create a [Durable] subscription for a 
NonPersistentTopic, "
+                    + "will use [NonDurable] to subscribe. Subscription name: 
{}", topic, conf.getSubscriptionName());
+        }
         this.subscriptionMode = conf.getSubscriptionMode();
         if (startMessageId != null) {
             MessageIdAdv firstChunkMessageId = ((MessageIdAdv) 
startMessageId).getFirstChunkMessageId();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ef0345de919..d0607b97c18 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1121,6 +1121,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 .timeout(1, TimeUnit.MILLISECONDS)
                 .build();
         configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+        configurationData = configurationData.clone();
         return ConsumerImpl.newConsumerImpl(client, partitionName,
                 configurationData, client.externalExecutorProvider(),
                 partitionIndex, true, listener != null, subFuture,


Reply via email to