This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 18562dab1cd [fix][broker] Create replicated subscriptions for new
partitions when needed (#18659)
18562dab1cd is described below
commit 18562dab1cd6145152e22638aaca41084c2c1987
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Nov 29 05:12:36 2022 +0200
[fix][broker] Create replicated subscriptions for new partitions when
needed (#18659)
---
.../broker/admin/impl/PersistentTopicsBase.java | 3 +-
.../pulsar/broker/service/ReplicatorTest.java | 35 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 657bcc6a55d..789f14633b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4408,12 +4408,13 @@ public class PersistentTopicsBase extends AdminResource
{
// We must not re-create non-durable subscriptions on
the new partitions
return;
}
+ boolean replicated = ss.isReplicated();
for (int i = partitionMetadata.partitions; i <
numPartitions; i++) {
final String topicNamePartition =
topicName.getPartition(i).toString();
CompletableFuture<Void> future = new
CompletableFuture<>();
admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription,
MessageId.latest).whenComplete((__, ex) -> {
+ subscription, MessageId.latest,
replicated).whenComplete((__, ex) -> {
if (ex == null) {
future.complete(null);
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index bffe0cbf582..63adcfd464d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
@@ -1171,6 +1172,40 @@ public class ReplicatorTest extends ReplicatorTestBase {
consumer2.close();
}
+ @Test
+ public void testIncrementPartitionsOfTopicWithReplicatedSubscription()
throws Exception {
+ final String cluster1 = pulsar1.getConfig().getClusterName();
+ final String cluster2 = pulsar2.getConfig().getClusterName();
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/topic1");
+ int startPartitions = 4;
+ int newPartitions = 8;
+ final String subscriberName = "sub1";
+ admin1.namespaces().createNamespace(namespace,
Sets.newHashSet(cluster1, cluster2));
+ admin1.topics().createPartitionedTopic(topicName, startPartitions);
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ Consumer<byte[]> consumer1 =
client1.newConsumer().topic(topicName).subscriptionName(subscriberName)
+ .replicateSubscriptionState(true)
+ .subscribe();
+
+ admin1.topics().updatePartitionedTopic(topicName, newPartitions);
+
+
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions,
newPartitions);
+
+ Map<String, Boolean> replicatedSubscriptionStatus =
+ admin1.topics().getReplicatedSubscriptionStatus(topicName,
subscriberName);
+ assertEquals(replicatedSubscriptionStatus.size(), newPartitions);
+ for (Map.Entry<String, Boolean> replicatedStatusForPartition :
replicatedSubscriptionStatus.entrySet()) {
+ assertTrue(replicatedStatusForPartition.getValue(),
+ "Replicated status is invalid for " +
replicatedStatusForPartition.getKey());
+ }
+ consumer1.close();
+ }
+
@DataProvider(name = "topicPrefix")
public static Object[][] topicPrefix() {
return new Object[][] { { "persistent://", "/persistent" }, {
"non-persistent://", "/non-persistent" } };