This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 18562dab1cd [fix][broker] Create replicated subscriptions for new 
partitions when needed (#18659)
18562dab1cd is described below

commit 18562dab1cd6145152e22638aaca41084c2c1987
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Nov 29 05:12:36 2022 +0200

    [fix][broker] Create replicated subscriptions for new partitions when 
needed (#18659)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  3 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 35 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 657bcc6a55d..789f14633b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4408,12 +4408,13 @@ public class PersistentTopicsBase extends AdminResource 
{
                         // We must not re-create non-durable subscriptions on 
the new partitions
                         return;
                     }
+                    boolean replicated = ss.isReplicated();
 
                     for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
                         final String topicNamePartition = 
topicName.getPartition(i).toString();
                         CompletableFuture<Void> future = new 
CompletableFuture<>();
                         
admin.topics().createSubscriptionAsync(topicNamePartition,
-                                subscription, 
MessageId.latest).whenComplete((__, ex) -> {
+                                        subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {
                             if (ex == null) {
                                 future.complete(null);
                             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index bffe0cbf582..63adcfd464d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
@@ -1171,6 +1172,40 @@ public class ReplicatorTest extends ReplicatorTestBase {
         consumer2.close();
     }
 
+    @Test
+    public void testIncrementPartitionsOfTopicWithReplicatedSubscription() 
throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/topic1");
+        int startPartitions = 4;
+        int newPartitions = 8;
+        final String subscriberName = "sub1";
+        admin1.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster1, cluster2));
+        admin1.topics().createPartitionedTopic(topicName, startPartitions);
+
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+
+        Consumer<byte[]> consumer1 = 
client1.newConsumer().topic(topicName).subscriptionName(subscriberName)
+                .replicateSubscriptionState(true)
+                .subscribe();
+
+        admin1.topics().updatePartitionedTopic(topicName, newPartitions);
+
+        
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 
newPartitions);
+
+        Map<String, Boolean> replicatedSubscriptionStatus =
+                admin1.topics().getReplicatedSubscriptionStatus(topicName, 
subscriberName);
+        assertEquals(replicatedSubscriptionStatus.size(), newPartitions);
+        for (Map.Entry<String, Boolean> replicatedStatusForPartition : 
replicatedSubscriptionStatus.entrySet()) {
+            assertTrue(replicatedStatusForPartition.getValue(),
+                    "Replicated status is invalid for " + 
replicatedStatusForPartition.getKey());
+        }
+        consumer1.close();
+    }
+
     @DataProvider(name = "topicPrefix")
     public static Object[][] topicPrefix() {
         return new Object[][] { { "persistent://", "/persistent" }, { 
"non-persistent://", "/non-persistent" } };

Reply via email to