This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 7cd0924bc99 [fix][broker][admin] Fix cannot update properties on
NonDurable subscription. (#22411)
7cd0924bc99 is described below
commit 7cd0924bc9996f95b36601958ae156408614c523
Author: 道君 <[email protected]>
AuthorDate: Fri Apr 5 15:42:40 2024 +0800
[fix][broker][admin] Fix cannot update properties on NonDurable
subscription. (#22411)
(cherry picked from commit 902728ef6590233b87c14d2528590ad7e6fdcc12)
# Conflicts:
#
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 ++++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 38 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8db8a571439..f67f534f86d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -350,15 +350,19 @@ public class ManagedCursorImpl implements ManagedCursor {
final Function<Map<String, String>, Map<String, String>>
updateFunction) {
CompletableFuture<Void> updateCursorPropertiesResult = new
CompletableFuture<>();
- final Stat lastCursorLedgerStat =
ManagedCursorImpl.this.cursorLedgerStat;
-
Map<String, String> newProperties =
updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+ if (!isDurable()) {
+ this.cursorProperties = Collections.unmodifiableMap(newProperties);
+ updateCursorPropertiesResult.complete(null);
+ return updateCursorPropertiesResult;
+ }
+
ManagedCursorInfo copy = ManagedCursorInfo
.newBuilder(ManagedCursorImpl.this.managedCursorInfo)
.clearCursorProperties()
.addAllCursorProperties(buildStringPropertiesMap(newProperties))
.build();
-
+ final Stat lastCursorLedgerStat =
ManagedCursorImpl.this.cursorLedgerStat;
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 1f755234009..c7b066faea9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
@@ -53,6 +54,8 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.v2.ExtPersistentTopics;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
@@ -65,6 +68,8 @@ import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -76,6 +81,7 @@ import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -1773,4 +1779,36 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
assertTrue(namespaces.contains(ns1V2));
assertTrue(namespaces.contains(ns1V1));
}
+
+ @Test
+ public void testUpdatePropertiesOnNonDurableSub() throws Exception {
+ String topic = "persistent://" + testTenant + "/" + testNamespaceLocal
+ "/testUpdatePropertiesOnNonDurableSub";
+ String subscription = "sub";
+ admin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ Reader<String> __ = pulsarClient.newReader(Schema.STRING)
+ .startMessageId(MessageId.earliest)
+ .subscriptionName(subscription)
+ .topic(topic)
+ .create();
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).get().get();
+ PersistentSubscription subscription1 =
persistentTopic.getSubscriptions().get(subscription);
+ assertNotNull(subscription1);
+ ManagedCursor cursor = subscription1.getCursor();
+
+ Map<String, String> properties =
admin.topics().getSubscriptionProperties(topic, subscription);
+ assertEquals(properties.size(), 0);
+ assertTrue(MapUtils.isEmpty(cursor.getCursorProperties()));
+
+ admin.topics().updateSubscriptionProperties(topic, subscription,
Map.of("foo", "bar"));
+ properties = admin.topics().getSubscriptionProperties(topic,
subscription);
+ assertEquals(properties.size(), 1);
+ assertEquals(properties.get("foo"), "bar");
+
+ assertEquals(cursor.getCursorProperties().size(), 1);
+ assertEquals(cursor.getCursorProperties().get("foo"), "bar");
+ }
}