This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new bb202d57369 [broker][fix]Fix update topic remove properties (#17231)
bb202d57369 is described below
commit bb202d57369cb032c1f555e9c38395baf45355f6
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Wed Aug 24 16:11:30 2022 +0800
[broker][fix]Fix update topic remove properties (#17231)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 42 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d9adc4ac20b..231168e35e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -481,7 +481,8 @@ public class PersistentTopicsBase extends AdminResource {
return
updatePartitionInOtherCluster(numPartitions, clusters)
.thenCompose(v ->
namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p ->
- new
PartitionedTopicMetadata(numPartitions)
+ new
PartitionedTopicMetadata(numPartitions,
+
p.properties)
));
} else {
return
CompletableFuture.completedFuture(null);
@@ -4361,7 +4362,8 @@ public class PersistentTopicsBase extends AdminResource {
CompletableFuture<Void> result = new CompletableFuture<>();
createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
- .updatePartitionedTopicAsync(topicName, p -> new
PartitionedTopicMetadata(numPartitions));
+ .updatePartitionedTopicAsync(topicName, p ->
+ new PartitionedTopicMetadata(numPartitions,
p.properties));
future.exceptionally(ex -> {
// If the update operation fails, clean up the partitions that
were created
getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 81c35f4db77..aba6697f2cd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -598,6 +598,48 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(),
Response.Status.CONFLICT.getStatusCode());
}
+ @Test
+ public void testUpdatePartitionedTopicHavingProperties() throws Exception {
+ final String tenant =
"tenant-testUpdatePartitionedTopicHavingProperties";
+ final String namespace =
"ns-testUpdatePartitionedTopicHavingProperties";
+ final String topic =
"topic-testUpdatePartitionedTopicHavingProperties";
+ Map<String, String> topicMetadata = new HashMap<>();
+ topicMetadata.put("key1", "value1");
+
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
+ admin.tenants().createTenant(tenant, tenantInfo);
+ admin.namespaces().createNamespace(tenant + "/" + namespace,
Set.of("test"));
+
+ // create a 2 partition topic with properties key1->value1
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor =
+ ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2,
topicMetadata);
+ persistentTopics.createPartitionedTopic(response, tenant, namespace,
topic, metadata, true);
+ Awaitility.await().untilAsserted(() -> {
+ persistentTopics.getPartitionedMetadata(response,
+ tenant, namespace, topic, true, false);
+ verify(response,
timeout(5000).atLeast(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().properties.size(),
1);
+ Assert.assertEquals(responseCaptor.getValue().properties,
topicMetadata);
+ });
+
+ // update partition to 5
+ final int updatedPartition = 5;
+ AsyncResponse response2 = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 =
+ ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.updatePartitionedTopic(response2, tenant, namespace,
topic, false, false, false, updatedPartition);
+ Awaitility.await().untilAsserted(() -> {
+ persistentTopics.getPartitionedMetadata(response2,
+ tenant, namespace, topic, true, false);
+ verify(response2,
timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
+ Assert.assertEquals(responseCaptor2.getValue().partitions,
updatedPartition);
+ Assert.assertEquals(responseCaptor2.getValue().properties.size(),
1);
+ Assert.assertEquals(responseCaptor2.getValue().properties,
topicMetadata);
+ });
+ }
+
@Test
public void
testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws
Exception {
// Already have non partition topic special-topic-partition-10,
shouldn't able to update number of partitioned topic to more than 10.