This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 149b9f770a4 [fix][broker] Fix pulsar.replicated.subscription checks
(#23782)
149b9f770a4 is described below
commit 149b9f770a44e693729e850689aa215d4d5af25c
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Dec 31 10:15:41 2024 +0800
[fix][broker] Fix pulsar.replicated.subscription checks (#23782)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../service/persistent/PersistentSubscription.java | 10 ++++--
.../broker/service/persistent/PersistentTopic.java | 9 ++++--
.../persistent/PersistentSubscriptionTest.java | 37 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8cebbd52695..b5a1a9db5de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -118,7 +118,7 @@ public class PersistentSubscription extends
AbstractSubscription {
// for connected subscriptions, message expiry will be checked if the
backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
- private static final String REPLICATED_SUBSCRIPTION_PROPERTY =
"pulsar.replicated.subscription";
+ protected static final String REPLICATED_SUBSCRIPTION_PROPERTY =
"pulsar.replicated.subscription";
// Map of properties that is used to mark this subscription as
"replicated".
// Since this is the only field at this point, we can just keep a static
@@ -140,8 +140,12 @@ public class PersistentSubscription extends
AbstractSubscription {
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}
- static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
- return
cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
+ static Optional<Boolean>
getReplicatedSubscriptionConfiguration(ManagedCursor cursor) {
+ Long v = cursor.getProperties().get(REPLICATED_SUBSCRIPTION_PROPERTY);
+ if (v == null || (v < 0L || v > 1L)) {
+ return Optional.empty();
+ }
+ return Optional.of(v == 1L);
}
public PersistentSubscription(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 56aeb9b4a5e..08a481b7051 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -512,9 +512,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// ignore it for now and let the message dedup logic to
take care of it
} else {
final String subscriptionName =
Codec.decode(cursor.getName());
- subscriptions.put(subscriptionName,
createPersistentSubscription(subscriptionName, cursor,
-
PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null,
- cursor.getCursorProperties()));
+ Optional<Boolean> replicatedSubscriptionConfiguration =
+
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+ Boolean replicated =
replicatedSubscriptionConfiguration.orElse(null);
+ subscriptions.put(subscriptionName,
+ createPersistentSubscription(subscriptionName,
cursor, replicated,
+ cursor.getCursorProperties()));
// subscription-cursor gets activated by default:
deactivate as there is no active subscription
// right now
subscriptions.get(subscriptionName).deactivateCursor();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 9a46a2919d1..360be2e435a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
@@ -30,11 +31,13 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
@@ -226,6 +229,40 @@ public class PersistentSubscriptionTest {
assertTrue(persistentSubscription.cursor.getLastActive() >
beforeAcknowledgeTimestamp);
}
+ @Test
+ public void testGetReplicatedSubscriptionConfiguration() {
+ Map<String, Long> properties =
PersistentSubscription.getBaseCursorProperties(true);
+
assertThat(properties).containsEntry(PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY,
1L);
+ ManagedCursor cursor = mock(ManagedCursor.class);
+ doReturn(properties).when(cursor).getProperties();
+ Optional<Boolean> replicatedSubscriptionConfiguration =
+
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+
assertThat(replicatedSubscriptionConfiguration).isNotEmpty().get().isEqualTo(Boolean.TRUE);
+
+ properties =
Map.of(PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY, 10L);
+ doReturn(properties).when(cursor).getProperties();
+ replicatedSubscriptionConfiguration =
+
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+ assertThat(replicatedSubscriptionConfiguration).isEmpty();
+ properties =
Map.of(PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY, -1L);
+ doReturn(properties).when(cursor).getProperties();
+ replicatedSubscriptionConfiguration =
+
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+ assertThat(replicatedSubscriptionConfiguration).isEmpty();
+
+ properties = PersistentSubscription.getBaseCursorProperties(false);
+ doReturn(properties).when(cursor).getProperties();
+ replicatedSubscriptionConfiguration =
+
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+ assertThat(replicatedSubscriptionConfiguration).isEmpty();
+
+ properties = PersistentSubscription.getBaseCursorProperties(null);
+ doReturn(properties).when(cursor).getProperties();
+ replicatedSubscriptionConfiguration =
+
PersistentSubscription.getReplicatedSubscriptionConfiguration(cursor);
+ assertThat(replicatedSubscriptionConfiguration).isEmpty();
+ }
+
public static class CustomTransactionPendingAckStoreProvider implements
TransactionPendingAckStoreProvider {
@Override
public CompletableFuture<PendingAckStore>
newPendingAckStore(PersistentSubscription subscription) {