This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 253e3e4d8fc [improve][broker] Copy subscription properties during 
updating the topic partition number. (#19223)
253e3e4d8fc is described below

commit 253e3e4d8fc4dc9afca1851ce91af04352b7fd60
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Jan 13 20:05:37 2023 +0800

    [improve][broker] Copy subscription properties during updating the topic 
partition number. (#19223)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  3 +-
 .../broker/admin/IncrementPartitionsTest.java      | 40 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e839ec3064e..c090ef0516a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4446,7 +4446,8 @@ public class PersistentTopicsBase extends AdminResource {
                     final String topicNamePartition = 
topicName.getPartition(i).toString();
                     CompletableFuture<Void> future = new CompletableFuture<>();
                     admin.topics().createSubscriptionAsync(topicNamePartition,
-                                    subscription, MessageId.earliest, 
replicated).whenComplete((__, ex) -> {
+                                    subscription, MessageId.earliest, 
replicated, ss.getSubscriptionProperties())
+                            .whenComplete((__, ex) -> {
                         if (ex == null) {
                             future.complete(null);
                         } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index da753b07159..67997157b65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
@@ -34,7 +37,9 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -60,6 +65,9 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("use"));
         admin.tenants().createTenant("prop-xyz", tenantInfo);
         admin.namespaces().createNamespace("prop-xyz/use/ns1");
+
+        // Setup v2 namespaces
+        setupDefaultTenantAndNamespace();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -88,7 +96,7 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 1);
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("sub-1")
-          .subscribe();
+                    .subscribe();
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 2);
@@ -105,6 +113,36 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
     }
 
+    @Test
+    public void testIncrementPartitionsOfTopicWithSubscriptionProperties() 
throws Exception {
+        final String partitionedTopicName = UUID.randomUUID()
+                + "-testIncrementPartitionsOfTopicWithSubscriptionProperties";
+
+        admin.topics().createPartitionedTopic(partitionedTopicName, 1);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 1);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("method", "testIncrementPartitionsOfTopic");
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(partitionedTopicName)
+                .subscriptionName("sub-1")
+                .subscriptionProperties(properties)
+                .subscribe();
+
+        admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 20);
+
+        assertEquals(admin.topics().getSubscriptions(
+                
TopicName.get(partitionedTopicName).getPartition(15).toString()), 
List.of("sub-1"));
+        TopicStats stats = admin.topics()
+                
.getStats(TopicName.get(partitionedTopicName).getPartition(15).toString());
+        Map<String, String> subscriptionProperties = stats.getSubscriptions()
+                .get("sub-1").getSubscriptionProperties();
+        Assert.assertEquals(properties, subscriptionProperties);
+    }
+
     @Test
     public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
         final String partitionedTopicName =

Reply via email to