This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ec86a9de598853fb38c2122730f035e8345b6165 Author: feynmanlin <[email protected]> AuthorDate: Thu Jun 17 10:20:45 2021 +0800 When the Replicator is enabled, no managedLedger is created when updating the number of partitions (#10910) Fixes #10673 ### Motivation When updating the number of partitions, we need to update the data in two places in zk: ``` /admin/partitioned-topics /managed-ledgers/ ``` Now we only update the number of partitions in `/admin/partitioned-topics`, so if we do not create a Producer or Consumer, the data obtained in another cluster will be incorrect ### Modifications 1)Try to create managedLedger when updating the number of partitions 2)Ensure that the number of partitions in `/admin/partitioned-topics` is updated every time (cherry picked from commit 202da117b529b24bdf9c994750266dac597294a8) --- .../broker/admin/impl/PersistentTopicsBase.java | 3 +- .../pulsar/broker/service/ReplicatorTest.java | 37 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 8bf5a7c..97f4a8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -422,6 +422,7 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list"); } try { + tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS); createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS); } catch (Exception e) { if (e.getCause() instanceof RestException) { @@ -435,7 +436,7 @@ public class PersistentTopicsBase extends AdminResource { if (!updateLocalTopicOnly) { CompletableFuture<Void> updatePartition = new CompletableFuture<>(); final String path = ZkAdminPaths.partitionedTopicPath(topicName); - updatePartitionInOtherCluster(numPartitions, clusters).thenAccept((res) -> { + updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> { try { namespaceResources().getPartitionedTopicResources().setAsync(path, (p) -> { return new PartitionedTopicMetadata(numPartitions); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index ead91a5..e5219d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -36,6 +36,7 @@ import java.lang.reflect.Method; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -803,6 +804,42 @@ public class ReplicatorTest extends ReplicatorTestBase { reader2.closeAsync().get(); } + @Test + public void testReplicatorWithPartitionedTopic() throws Exception { + final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); + final String persistentTopicName = "persistent://" + namespace + "/partTopic" + UUID.randomUUID(); + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); + // Create partitioned-topic from R1 + admin1.topics().createPartitionedTopic(persistentTopicName, 3); + // List partitioned topics from R2 + Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace))); + Awaitility.await().untilAsserted(() -> assertEquals( + admin2.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName)); + assertEquals(admin1.topics().getList(namespace).size(), 3); + // List partitioned topics from R3 + Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace))); + Awaitility.await().untilAsserted(() -> assertEquals( + admin3.topics().getPartitionedTopicList(namespace).get(0), persistentTopicName)); + // Update partitioned topic from R2 + admin2.topics().updatePartitionedTopic(persistentTopicName, 5); + assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5); + assertEquals(admin2.topics().getList(namespace).size(), 5); + // Update partitioned topic from R3 + admin3.topics().updatePartitionedTopic(persistentTopicName, 5); + assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5); + assertEquals(admin3.topics().getList(namespace).size(), 5); + // Update partitioned topic from R1 + admin1.topics().updatePartitionedTopic(persistentTopicName, 6); + assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6); + assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6); + assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6); + assertEquals(admin1.topics().getList(namespace).size(), 6); + assertEquals(admin2.topics().getList(namespace).size(), 6); + assertEquals(admin3.topics().getList(namespace).size(), 6); + } + /** * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix) *
