This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new acf3a56ba96 [fix][broker] Fix creation of replicated subscriptions for 
partitioned topics (#24997)
acf3a56ba96 is described below

commit acf3a56ba96eba2040912ffb4b7405d8f54dc3a2
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 19 10:51:05 2025 +0200

    [fix][broker] Fix creation of replicated subscriptions for partitioned 
topics (#24997)
---
 .../broker/admin/impl/PersistentTopicsBase.java     |  2 +-
 .../apache/pulsar/broker/admin/AdminApi2Test.java   | 21 +++++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 5a365b13995..85387ccc267 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2220,7 +2220,7 @@ public class PersistentTopicsBase extends AdminResource {
                                     try {
                                         pulsar().getAdminClient().topics()
                                                 
.createSubscriptionAsync(topicNamePartition.toString(),
-                                                        subscriptionName, 
targetMessageId, false, properties)
+                                                        subscriptionName, 
targetMessageId, replicated, properties)
                                                 .handle((r, ex) -> {
                                                     if (ex != null) {
                                                         // fail the operation 
on unknown exception or
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index f889f1375a2..b358c672c6c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2543,6 +2543,27 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(dataProvider = "trueFalse")
+    public void testCreateReplicatedSubscriptionForPartitionedTopic(boolean 
replicated) throws Exception {
+        final String topic = newUniqueName("persistent://" + defaultNamespace 
+ "/topic");
+        admin.topics().createPartitionedTopic(topic, 10);
+        admin.topics().createSubscription(topic, "sub", MessageId.earliest, 
replicated);
+        for (int i = 0; i < 10; i++) {
+            String individualPartition = 
TopicName.get(topic).getPartition(i).toString();
+            TopicStats stats = admin.topics().getStats(individualPartition);
+            assertEquals(stats.getSubscriptions().get("sub").isReplicated(), 
replicated);
+        }
+    }
+
+    @Test(dataProvider = "trueFalse")
+    public void testCreateReplicatedSubscriptionForNonPartitionedTopic(boolean 
replicated) throws Exception {
+        final String topic = newUniqueName("persistent://" + defaultNamespace 
+ "/topic");
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createSubscription(topic, "sub", MessageId.earliest, 
replicated);
+        TopicStats stats = admin.topics().getStats(topic);
+        assertEquals(stats.getSubscriptions().get("sub").isReplicated(), 
replicated);
+    }
+
     @Test
     public void testMaxSubscriptionsPerTopic() throws Exception {
         restartClusterAfterTest();

Reply via email to