This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new bb202d57369 [broker][fix]Fix update topic remove properties (#17231)
bb202d57369 is described below

commit bb202d57369cb032c1f555e9c38395baf45355f6
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Wed Aug 24 16:11:30 2022 +0800

    [broker][fix]Fix update topic remove properties (#17231)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 42 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d9adc4ac20b..231168e35e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -481,7 +481,8 @@ public class PersistentTopicsBase extends AdminResource {
                                     return 
updatePartitionInOtherCluster(numPartitions, clusters)
                                         .thenCompose(v -> 
namespaceResources().getPartitionedTopicResources()
                                                         
.updatePartitionedTopicAsync(topicName, p ->
-                                                                new 
PartitionedTopicMetadata(numPartitions)
+                                                                new 
PartitionedTopicMetadata(numPartitions,
+                                                                    
p.properties)
                                                         ));
                                 } else {
                                     return 
CompletableFuture.completedFuture(null);
@@ -4361,7 +4362,8 @@ public class PersistentTopicsBase extends AdminResource {
         CompletableFuture<Void> result = new CompletableFuture<>();
         createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
             CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p -> new 
PartitionedTopicMetadata(numPartitions));
+                    .updatePartitionedTopicAsync(topicName, p ->
+                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
             future.exceptionally(ex -> {
                 // If the update operation fails, clean up the partitions that 
were created
                 getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 81c35f4db77..aba6697f2cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -598,6 +598,48 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), 
Response.Status.CONFLICT.getStatusCode());
     }
 
+    @Test
+    public void testUpdatePartitionedTopicHavingProperties() throws Exception {
+        final String tenant = 
"tenant-testUpdatePartitionedTopicHavingProperties";
+        final String namespace = 
"ns-testUpdatePartitionedTopicHavingProperties";
+        final String topic = 
"topic-testUpdatePartitionedTopicHavingProperties";
+        Map<String, String> topicMetadata = new HashMap<>();
+        topicMetadata.put("key1", "value1");
+
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
+        admin.tenants().createTenant(tenant, tenantInfo);
+        admin.namespaces().createNamespace(tenant + "/" + namespace, 
Set.of("test"));
+
+        // create a 2 partition topic with properties key1->value1
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor =
+            ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, 
topicMetadata);
+        persistentTopics.createPartitionedTopic(response, tenant, namespace, 
topic, metadata, true);
+        Awaitility.await().untilAsserted(() -> {
+            persistentTopics.getPartitionedMetadata(response,
+                tenant, namespace, topic, true, false);
+            verify(response, 
timeout(5000).atLeast(1)).resume(responseCaptor.capture());
+            Assert.assertEquals(responseCaptor.getValue().properties.size(), 
1);
+            Assert.assertEquals(responseCaptor.getValue().properties, 
topicMetadata);
+        });
+
+        // update partition to 5
+        final int updatedPartition = 5;
+        AsyncResponse response2 = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 =
+            ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.updatePartitionedTopic(response2, tenant, namespace, 
topic, false, false, false, updatedPartition);
+        Awaitility.await().untilAsserted(() -> {
+            persistentTopics.getPartitionedMetadata(response2,
+                tenant, namespace, topic, true, false);
+            verify(response2, 
timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
+            Assert.assertEquals(responseCaptor2.getValue().partitions, 
updatedPartition);
+            Assert.assertEquals(responseCaptor2.getValue().properties.size(), 
1);
+            Assert.assertEquals(responseCaptor2.getValue().properties, 
topicMetadata);
+        });
+    }
+
     @Test
     public void 
testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws 
Exception {
         // Already have non partition topic special-topic-partition-10, 
shouldn't able to update number of partitioned topic to more than 10.

Reply via email to