This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 224ac1d803c [fix][broker][admin] Fix cannot update properties on 
NonDurable subscription. (#22411)
224ac1d803c is described below

commit 224ac1d803c1d65ab8ea15a84a734f9481342a14
Author: 道君 <[email protected]>
AuthorDate: Fri Apr 5 15:42:40 2024 +0800

    [fix][broker][admin] Fix cannot update properties on NonDurable 
subscription. (#22411)
    
    (cherry picked from commit 902728ef6590233b87c14d2528590ad7e6fdcc12)
    
    # Conflicts:
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 ++++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b5c16317f2b..995f980953b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -350,15 +350,19 @@ public class ManagedCursorImpl implements ManagedCursor {
             final Function<Map<String, String>, Map<String, String>> 
updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new 
CompletableFuture<>();
 
-        final Stat lastCursorLedgerStat = 
ManagedCursorImpl.this.cursorLedgerStat;
-
         Map<String, String> newProperties = 
updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        if (!isDurable()) {
+            this.cursorProperties = Collections.unmodifiableMap(newProperties);
+            updateCursorPropertiesResult.complete(null);
+            return updateCursorPropertiesResult;
+        }
+
         ManagedCursorInfo copy = ManagedCursorInfo
                 .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
                 .clearCursorProperties()
                 
.addAllCursorProperties(buildStringPropertiesMap(newProperties))
                 .build();
-
+        final Stat lastCursorLedgerStat = 
ManagedCursorImpl.this.cursorLedgerStat;
         ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
                 name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 4cf23bc9e6c..59c3dbf6ff3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,7 +31,10 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -51,6 +54,8 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
@@ -63,6 +68,8 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -73,6 +80,7 @@ import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -1687,4 +1695,36 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         String topicName = "persistent://" + testTenant + "/" + 
testNamespaceLocal + "/testCreateMissingPartitions";
         assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics().createMissedPartitions(topicName));
     }
+
+    @Test
+    public void testUpdatePropertiesOnNonDurableSub() throws Exception {
+        String topic = "persistent://" + testTenant + "/" + testNamespaceLocal 
+ "/testUpdatePropertiesOnNonDurableSub";
+        String subscription = "sub";
+        admin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        Reader<String> __ = pulsarClient.newReader(Schema.STRING)
+                .startMessageId(MessageId.earliest)
+                .subscriptionName(subscription)
+                .topic(topic)
+                .create();
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).get().get();
+        PersistentSubscription subscription1 = 
persistentTopic.getSubscriptions().get(subscription);
+        assertNotNull(subscription1);
+        ManagedCursor cursor = subscription1.getCursor();
+
+        Map<String, String> properties = 
admin.topics().getSubscriptionProperties(topic, subscription);
+        assertEquals(properties.size(), 0);
+        assertTrue(MapUtils.isEmpty(cursor.getCursorProperties()));
+
+        admin.topics().updateSubscriptionProperties(topic, subscription, 
Map.of("foo", "bar"));
+        properties = admin.topics().getSubscriptionProperties(topic, 
subscription);
+        assertEquals(properties.size(), 1);
+        assertEquals(properties.get("foo"), "bar");
+
+        assertEquals(cursor.getCursorProperties().size(), 1);
+        assertEquals(cursor.getCursorProperties().get("foo"), "bar");
+    }
 }

Reply via email to