This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 5c27ca844d1 [fix][broker] Create replicated subscriptions for new
partitions when needed (#18659)
5c27ca844d1 is described below
commit 5c27ca844d1d29710a7e07d008b4ae4211527c35
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Nov 29 05:12:36 2022 +0200
[fix][broker] Create replicated subscriptions for new partitions when
needed (#18659)
(cherry picked from commit a5cbebbbb5f7810b8909202741087556983a0afd)
---
.../broker/admin/impl/PersistentTopicsBase.java | 3 +-
.../pulsar/broker/service/ReplicatorTest.java | 35 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 085404e755a..f0f8e6a17b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4200,12 +4200,13 @@ public class PersistentTopicsBase extends AdminResource
{
// We must not re-create non-durable subscriptions on
the new partitions
return;
}
+ boolean replicated = ss.isReplicated();
for (int i = partitionMetadata.partitions; i <
numPartitions; i++) {
final String topicNamePartition =
topicName.getPartition(i).toString();
CompletableFuture<Void> future = new
CompletableFuture<>();
admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription,
MessageId.latest).whenComplete((__, ex) -> {
+ subscription, MessageId.latest,
replicated).whenComplete((__, ex) -> {
if (ex == null) {
future.complete(null);
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 34606871f50..e736aa86d7b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
@@ -1168,6 +1169,40 @@ public class ReplicatorTest extends ReplicatorTestBase {
consumer2.close();
}
+ @Test
+ public void testIncrementPartitionsOfTopicWithReplicatedSubscription()
throws Exception {
+ final String cluster1 = pulsar1.getConfig().getClusterName();
+ final String cluster2 = pulsar2.getConfig().getClusterName();
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/topic1");
+ int startPartitions = 4;
+ int newPartitions = 8;
+ final String subscriberName = "sub1";
+ admin1.namespaces().createNamespace(namespace,
Sets.newHashSet(cluster1, cluster2));
+ admin1.topics().createPartitionedTopic(topicName, startPartitions);
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ Consumer<byte[]> consumer1 =
client1.newConsumer().topic(topicName).subscriptionName(subscriberName)
+ .replicateSubscriptionState(true)
+ .subscribe();
+
+ admin1.topics().updatePartitionedTopic(topicName, newPartitions);
+
+
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions,
newPartitions);
+
+ Map<String, Boolean> replicatedSubscriptionStatus =
+ admin1.topics().getReplicatedSubscriptionStatus(topicName,
subscriberName);
+ assertEquals(replicatedSubscriptionStatus.size(), newPartitions);
+ for (Map.Entry<String, Boolean> replicatedStatusForPartition :
replicatedSubscriptionStatus.entrySet()) {
+ assertTrue(replicatedStatusForPartition.getValue(),
+ "Replicated status is invalid for " +
replicatedStatusForPartition.getKey());
+ }
+ consumer1.close();
+ }
+
@DataProvider(name = "topicPrefix")
public static Object[][] topicPrefix() {
return new Object[][] { { "persistent://", "/persistent" }, {
"non-persistent://", "/non-persistent" } };