This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new acf3a56ba96 [fix][broker] Fix creation of replicated subscriptions for
partitioned topics (#24997)
acf3a56ba96 is described below
commit acf3a56ba96eba2040912ffb4b7405d8f54dc3a2
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 19 10:51:05 2025 +0200
[fix][broker] Fix creation of replicated subscriptions for partitioned
topics (#24997)
---
.../broker/admin/impl/PersistentTopicsBase.java | 2 +-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 21 +++++++++++++++++++++
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 5a365b13995..85387ccc267 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2220,7 +2220,7 @@ public class PersistentTopicsBase extends AdminResource {
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
- subscriptionName,
targetMessageId, false, properties)
+ subscriptionName,
targetMessageId, replicated, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation
on unknown exception or
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index f889f1375a2..b358c672c6c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2543,6 +2543,27 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
}
}
+ @Test(dataProvider = "trueFalse")
+ public void testCreateReplicatedSubscriptionForPartitionedTopic(boolean
replicated) throws Exception {
+ final String topic = newUniqueName("persistent://" + defaultNamespace
+ "/topic");
+ admin.topics().createPartitionedTopic(topic, 10);
+ admin.topics().createSubscription(topic, "sub", MessageId.earliest,
replicated);
+ for (int i = 0; i < 10; i++) {
+ String individualPartition =
TopicName.get(topic).getPartition(i).toString();
+ TopicStats stats = admin.topics().getStats(individualPartition);
+ assertEquals(stats.getSubscriptions().get("sub").isReplicated(),
replicated);
+ }
+ }
+
+ @Test(dataProvider = "trueFalse")
+ public void testCreateReplicatedSubscriptionForNonPartitionedTopic(boolean
replicated) throws Exception {
+ final String topic = newUniqueName("persistent://" + defaultNamespace
+ "/topic");
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createSubscription(topic, "sub", MessageId.earliest,
replicated);
+ TopicStats stats = admin.topics().getStats(topic);
+ assertEquals(stats.getSubscriptions().get("sub").isReplicated(),
replicated);
+ }
+
@Test
public void testMaxSubscriptionsPerTopic() throws Exception {
restartClusterAfterTest();