This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 05cbbfd6dd0 [improve][broker] Copy subscription properties during 
updating the topic partition number. (#19223)
05cbbfd6dd0 is described below

commit 05cbbfd6dd003e7b7d40bf142c68774fc4c06f42
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Jan 13 20:05:37 2023 +0800

    [improve][broker] Copy subscription properties during updating the topic 
partition number. (#19223)
    
    (cherry picked from commit 253e3e4d8fc4dc9afca1851ce91af04352b7fd60)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  3 +-
 .../broker/admin/IncrementPartitionsTest.java      | 42 ++++++++++++++++++++--
 2 files changed, 42 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 22f47bf9751..7b897c65898 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4257,7 +4257,8 @@ public class PersistentTopicsBase extends AdminResource {
                         final String topicNamePartition = 
topicName.getPartition(i).toString();
                         CompletableFuture<Void> future = new 
CompletableFuture<>();
                         
admin.topics().createSubscriptionAsync(topicNamePartition,
-                                        subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {
+                                        subscription, MessageId.latest, 
replicated, ss.getSubscriptionProperties())
+                                .whenComplete((__, ex) -> {
                             if (ex == null) {
                                 future.complete(null);
                             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index 0f21fe3f83b..e156678a394 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -22,7 +22,10 @@ import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.util.Collections;
-
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
@@ -35,7 +38,9 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -61,6 +66,9 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
         TenantInfoImpl tenantInfo = new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use"));
         admin.tenants().createTenant("prop-xyz", tenantInfo);
         admin.namespaces().createNamespace("prop-xyz/use/ns1");
+
+        // Setup v2 namespaces
+        setupDefaultTenantAndNamespace();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -89,7 +97,7 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 1);
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("sub-1")
-          .subscribe();
+                    .subscribe();
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 2);
@@ -106,6 +114,36 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
     }
 
+    @Test
+    public void testIncrementPartitionsOfTopicWithSubscriptionProperties() 
throws Exception {
+        final String partitionedTopicName = UUID.randomUUID()
+                + "-testIncrementPartitionsOfTopicWithSubscriptionProperties";
+
+        admin.topics().createPartitionedTopic(partitionedTopicName, 1);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 1);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("method", "testIncrementPartitionsOfTopic");
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(partitionedTopicName)
+                .subscriptionName("sub-1")
+                .subscriptionProperties(properties)
+                .subscribe();
+
+        admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 20);
+
+        assertEquals(admin.topics().getSubscriptions(
+                
TopicName.get(partitionedTopicName).getPartition(15).toString()), 
List.of("sub-1"));
+        TopicStats stats = admin.topics()
+                
.getStats(TopicName.get(partitionedTopicName).getPartition(15).toString());
+        Map<String, String> subscriptionProperties = stats.getSubscriptions()
+                .get("sub-1").getSubscriptionProperties();
+        Assert.assertEquals(properties, subscriptionProperties);
+    }
+
     @Test
     public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
         final String partitionedTopicName =

Reply via email to