This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 05cbbfd6dd0 [improve][broker] Copy subscription properties during
updating the topic partition number. (#19223)
05cbbfd6dd0 is described below
commit 05cbbfd6dd003e7b7d40bf142c68774fc4c06f42
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Jan 13 20:05:37 2023 +0800
[improve][broker] Copy subscription properties during updating the topic
partition number. (#19223)
(cherry picked from commit 253e3e4d8fc4dc9afca1851ce91af04352b7fd60)
---
.../broker/admin/impl/PersistentTopicsBase.java | 3 +-
.../broker/admin/IncrementPartitionsTest.java | 42 ++++++++++++++++++++--
2 files changed, 42 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 22f47bf9751..7b897c65898 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4257,7 +4257,8 @@ public class PersistentTopicsBase extends AdminResource {
final String topicNamePartition =
topicName.getPartition(i).toString();
CompletableFuture<Void> future = new
CompletableFuture<>();
admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription, MessageId.latest,
replicated).whenComplete((__, ex) -> {
+ subscription, MessageId.latest,
replicated, ss.getSubscriptionProperties())
+ .whenComplete((__, ex) -> {
if (ex == null) {
future.complete(null);
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index 0f21fe3f83b..e156678a394 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -22,7 +22,10 @@ import static org.testng.Assert.assertEquals;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Collections;
-
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
@@ -35,7 +38,9 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -61,6 +66,9 @@ public class IncrementPartitionsTest extends
MockedPulsarServiceBaseTest {
TenantInfoImpl tenantInfo = new
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/use/ns1");
+
+ // Setup v2 namespaces
+ setupDefaultTenantAndNamespace();
}
@AfterMethod(alwaysRun = true)
@@ -89,7 +97,7 @@ public class IncrementPartitionsTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
1);
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("sub-1")
- .subscribe();
+ .subscribe();
admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
2);
@@ -106,6 +114,36 @@ public class IncrementPartitionsTest extends
MockedPulsarServiceBaseTest {
consumer.close();
}
+ @Test
+ public void testIncrementPartitionsOfTopicWithSubscriptionProperties()
throws Exception {
+ final String partitionedTopicName = UUID.randomUUID()
+ + "-testIncrementPartitionsOfTopicWithSubscriptionProperties";
+
+ admin.topics().createPartitionedTopic(partitionedTopicName, 1);
+
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
1);
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("method", "testIncrementPartitionsOfTopic");
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(partitionedTopicName)
+ .subscriptionName("sub-1")
+ .subscriptionProperties(properties)
+ .subscribe();
+
+ admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
+
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
20);
+
+ assertEquals(admin.topics().getSubscriptions(
+
TopicName.get(partitionedTopicName).getPartition(15).toString()),
List.of("sub-1"));
+ TopicStats stats = admin.topics()
+
.getStats(TopicName.get(partitionedTopicName).getPartition(15).toString());
+ Map<String, String> subscriptionProperties = stats.getSubscriptions()
+ .get("sub-1").getSubscriptionProperties();
+ Assert.assertEquals(properties, subscriptionProperties);
+ }
+
@Test
public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
final String partitionedTopicName =