This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cd2aa550d0f [improve] [broker] PIP-240: new public method
unloadSubscription in PersistentTopic (#19737)
cd2aa550d0f is described below
commit cd2aa550d0fe4e72b5ff88c4f6c1c2795b3ff2cd
Author: fengyubiao <[email protected]>
AuthorDate: Tue Apr 11 21:38:32 2023 +0800
[improve] [broker] PIP-240: new public method unloadSubscription in
PersistentTopic (#19737)
part-1 of PIP-240: add a new method unloadSubscription( String subName )
for PersistentTopic
---
.../broker/service/BrokerServiceException.java | 12 +
.../broker/service/persistent/PersistentTopic.java | 48 ++++
.../pulsar/client/api/UnloadSubscriptionTest.java | 265 +++++++++++++++++++++
3 files changed, 325 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index fd3a391bca3..3e77588b245 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -160,6 +160,18 @@ public class BrokerServiceException extends Exception {
}
}
+ public static class UnsupportedSubscriptionException extends
BrokerServiceException {
+ public UnsupportedSubscriptionException(String msg) {
+ super(msg);
+ }
+ }
+
+ public static class SubscriptionConflictUnloadException extends
BrokerServiceException {
+ public SubscriptionConflictUnloadException(String msg) {
+ super(msg);
+ }
+ }
+
public static class SubscriptionBusyException extends
BrokerServiceException {
public SubscriptionBusyException(String msg) {
super(msg);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0374fc98212..0f5e6043981 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import lombok.Getter;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -94,12 +95,14 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
+import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionConflictUnloadException;
import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicBacklogQuotaExceededException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
+import
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedSubscriptionException;
import
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
@@ -430,6 +433,51 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return pendingWriteOps;
}
+ /**
+ * Unload a subscriber.
+ * @throws SubscriptionNotFoundException If subscription not founded.
+ * @throws UnsupportedSubscriptionException If the subscription is typed
compaction.
+ * @throws SubscriptionConflictUnloadException Conflict topic-close,
topic-delete, another-subscribe-unload,
+ * cannot unload subscription now
+ */
+ public CompletableFuture<Void> unloadSubscription(@Nonnull String subName)
{
+ final PersistentSubscription sub = subscriptions.get(subName);
+ if (sub == null) {
+ return CompletableFuture.failedFuture(
+ new
SubscriptionNotFoundException(String.format("Subscription %s not found",
subName)));
+ }
+ if (Compactor.COMPACTION_SUBSCRIPTION.equals(sub.getName())){
+ return CompletableFuture.failedFuture(
+ new
UnsupportedSubscriptionException(String.format("Unsupported subscription: %s",
subName)));
+ }
+ // Fence old subscription -> Rewind cursor -> Replace with a new
subscription.
+ return sub.disconnect().thenCompose(ignore -> {
+ if (!lock.writeLock().tryLock()) {
+ return CompletableFuture.failedFuture(new
SubscriptionConflictUnloadException(String.format("Conflict"
+ + " topic-close, topic-delete,
another-subscribe-unload, cannot unload subscription %s now",
+ topic, subName)));
+ }
+ try {
+ if (isFenced) {
+ return CompletableFuture.failedFuture(new
TopicFencedException(String.format(
+ "Topic[%s] is fenced, can not unload subscription
%s now", topic, subName)));
+ }
+ if (sub != subscriptions.get(subName)) {
+ // Another task already finished.
+ return CompletableFuture.failedFuture(new
SubscriptionConflictUnloadException(String.format(
+ "Another unload subscriber[%s] has been finished,
do not repeat call.", subName)));
+ }
+ sub.getCursor().rewind();
+ PersistentSubscription subNew =
PersistentTopic.this.createPersistentSubscription(sub.getName(),
+ sub.getCursor(), sub.isReplicated(),
sub.getSubscriptionProperties());
+ subscriptions.put(subName, subNew);
+ return CompletableFuture.completedFuture(null);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ });
+ }
+
private PersistentSubscription createPersistentSubscription(String
subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Objects.requireNonNull(compactedTopic);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
new file mode 100644
index 00000000000..93d5bf30ec6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.testng.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class UnloadSubscriptionTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setSystemTopicEnabled(false);
+ conf.setTransactionCoordinatorEnabled(false);
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "unloadCases")
+ public Object[][] unloadCases (){
+ // [msgCount, enabledBatch, maxMsgPerBatch, subType, ackMsgCount]
+ return new Object[][]{
+ {100, false, 1, Exclusive, 0},
+ {100, false, 1, Failover, 0},
+ {100, false, 1, Shared, 0},
+ {100, false, 1, Key_Shared, 0},
+ {100, true, 5, Exclusive, 0},
+ {100, true, 5, Failover, 0},
+ {100, true, 5, Shared, 0},
+ {100, true, 5, Key_Shared, 0},
+ {100, false, 1, Exclusive, 50},
+ {100, false, 1, Failover, 50},
+ {100, false, 1, Shared, 50},
+ {100, false, 1, Key_Shared, 50},
+ {100, true, 5, Exclusive, 50},
+ {100, true, 5, Failover, 50},
+ {100, true, 5, Shared, 50},
+ {100, true, 5, Key_Shared, 50},
+ };
+ }
+
+ @Test(dataProvider = "unloadCases")
+ public void testSingleConsumer(int msgCount, boolean enabledBatch, int
maxMsgPerBatch, SubscriptionType subType,
+ int ackMsgCount) throws Exception {
+ final String topicName = "persistent://my-property/my-ns/tp-" +
UUID.randomUUID();
+ final String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName,
subType);
+ ProducerAndMessageIds producerAndMessageIds =
+ createProducerAndSendMessages(topicName, msgCount,
enabledBatch, maxMsgPerBatch);
+ log.info("send message-ids:{}-{}",
producerAndMessageIds.messageIds.size(),
+ toString(producerAndMessageIds.messageIds));
+
+ // Receive all messages and ack some.
+ MessagesEntry messagesEntry = receiveAllMessages(consumer);
+ assertEquals(messagesEntry.messageSet.size(), msgCount);
+ if (ackMsgCount > 0){
+ LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
+ Iterator<MessageId> messageIdIterator =
messagesEntry.messageIdSet.iterator();
+ for (int i = ackMsgCount; i > 0; i--){
+ ackedMessageIds.add(messageIdIterator.next());
+ }
+ consumer.acknowledge(ackedMessageIds.stream().toList());
+ log.info("ack message-ids: {}",
toString(ackedMessageIds.stream().toList()));
+ }
+
+
+ // Unload subscriber.
+ PersistentTopic persistentTopic = getPersistentTopic(topicName);
+ persistentTopic.unloadSubscription(subName);
+ // Receive all messages for the second time.
+ MessagesEntry messagesEntryForTheSecondTime =
receiveAllMessages(consumer);
+ log.info("received message-ids for the second time: {}",
+
toString(messagesEntryForTheSecondTime.messageIdSet.stream().toList()));
+ assertEquals(messagesEntryForTheSecondTime.messageSet.size(), msgCount
- ackMsgCount);
+
+ // cleanup.
+ producerAndMessageIds.producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+
+ @Test(dataProvider = "unloadCases")
+ public void testMultiConsumer(int msgCount, boolean enabledBatch, int
maxMsgPerBatch, SubscriptionType subType,
+ int ackMsgCount) throws Exception {
+ if (subType == Exclusive){
+ return;
+ }
+ final String topicName = "persistent://my-property/my-ns/tp-" +
UUID.randomUUID();
+ final String subName = "sub";
+ Consumer<String> consumer1 = createConsumer(topicName, subName,
subType);
+ Consumer<String> consumer2 = createConsumer(topicName, subName,
subType);
+ ProducerAndMessageIds producerAndMessageIds =
+ createProducerAndSendMessages(topicName, msgCount,
enabledBatch, maxMsgPerBatch);
+ log.info("send message-ids:{}-{}",
producerAndMessageIds.messageIds.size(),
+ toString(producerAndMessageIds.messageIds));
+
+ // Receive all messages and ack some.
+ MessagesEntry messagesEntry1 = receiveAllMessages(consumer1);
+ MessagesEntry messagesEntry2 = receiveAllMessages(consumer2);
+ LinkedHashSet<String> allMessages = new LinkedHashSet<>();
+ allMessages.addAll(messagesEntry1.messageSet);
+ allMessages.addAll(messagesEntry2.messageSet);
+ assertEquals(allMessages.size(), msgCount);
+ if (ackMsgCount > 0){
+ LinkedHashSet<MessageId> allMessageIds = new LinkedHashSet<>();
+ LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
+ allMessageIds.addAll(messagesEntry1.messageIdSet);
+ allMessageIds.addAll(messagesEntry2.messageIdSet);
+ Iterator<MessageId> messageIdIterator = allMessageIds.iterator();
+ for (int i = ackMsgCount; i > 0; i--){
+ ackedMessageIds.add(messageIdIterator.next());
+ }
+ consumer1.acknowledge(ackedMessageIds.stream().toList());
+ log.info("ack message-ids: {}",
toString(ackedMessageIds.stream().toList()));
+ }
+
+ // Unload subscriber.
+ PersistentTopic persistentTopic = getPersistentTopic(topicName);
+ persistentTopic.unloadSubscription(subName);
+
+ // Receive all messages for the second time.
+ MessagesEntry messagesEntryForTheSecondTime1 =
receiveAllMessages(consumer1);
+ MessagesEntry messagesEntryForTheSecondTime2 =
receiveAllMessages(consumer2);
+ LinkedHashSet<String> allMessagesForTheSecondTime = new
LinkedHashSet<>();
+
allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime1.messageSet);
+
allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime2.messageSet);
+ LinkedHashSet<MessageId> allMessageIdsForTheSecondTime = new
LinkedHashSet<>();
+ allMessageIdsForTheSecondTime.addAll(messagesEntry1.messageIdSet);
+ allMessageIdsForTheSecondTime.addAll(messagesEntry2.messageIdSet);
+ log.info("received message-ids for the second time: {}",
+ toString(allMessageIdsForTheSecondTime.stream().toList()));
+ assertEquals(allMessagesForTheSecondTime.size(), msgCount -
ackMsgCount);
+
+ // cleanup.
+ producerAndMessageIds.producer.close();
+ consumer1.close();
+ consumer2.close();
+ admin.topics().delete(topicName);
+ }
+
+ private static String toString(List<MessageId> messageIds){
+ List<String> messageIdStrings = new ArrayList<>(messageIds.size());
+ for (MessageId messageId : messageIds){
+ MessageIdImpl messageIdImpl;
+ if (messageId instanceof TopicMessageIdImpl) {
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)
messageId;
+ messageIdImpl = (MessageIdImpl)
topicMessageId.getInnerMessageId();
+ } else {
+ messageIdImpl = (MessageIdImpl) messageId;
+ }
+ StringBuilder stringBuilder = new
StringBuilder(String.valueOf(messageIdImpl.getEntryId()));
+ if (messageIdImpl instanceof BatchMessageIdImpl){
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageIdImpl;
+ stringBuilder.append("_")
+ .append(batchMessageId.getBatchIndex())
+ .append("/")
+ .append(batchMessageId.getBatchSize());
+ }
+ messageIdStrings.add(stringBuilder.toString());
+ }
+ return messageIdStrings.toString();
+ }
+
+ private PersistentTopic getPersistentTopic(String topicName) {
+ return (PersistentTopic) pulsar.getBrokerService().getTopic(topicName,
false).join().get();
+ }
+
+ private ProducerAndMessageIds createProducerAndSendMessages(String
topicName, int msgCount, boolean enabledBatch,
+ int maxMsgPerBatch)
throws Exception {
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(enabledBatch)
+ .batchingMaxMessages(maxMsgPerBatch)
+ .create();
+ ArrayList<CompletableFuture<MessageId>> messageIds = new ArrayList<>();
+ for (int i = 0; i < msgCount; i++) {
+ messageIds.add(producer.newMessage().key(String.valueOf(i %
10)).value(String.valueOf(i)).sendAsync());
+ }
+ FutureUtil.waitForAll(messageIds).join();
+ return new ProducerAndMessageIds(producer,
+
messageIds.stream().map(CompletableFuture::join).collect(Collectors.toList()));
+ }
+
+ private record ProducerAndMessageIds(Producer<String> producer,
List<MessageId> messageIds) {}
+
+ private Consumer<String> createConsumer(String topicName, String subName,
SubscriptionType subType)
+ throws Exception {
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(subType)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+ return consumer;
+ }
+
+ private MessagesEntry receiveAllMessages(Consumer<String> consumer) throws
Exception {
+ final Set<String> messageSet = Collections.synchronizedSet(new
LinkedHashSet<>());
+ final Set<MessageId> messageIdSet = Collections.synchronizedSet(new
LinkedHashSet<>());
+ while (true) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null){
+ break;
+ }
+ messageIdSet.add(msg.getMessageId());
+ messageSet.add(msg.getValue());
+ }
+ return new MessagesEntry(messageSet, messageIdSet);
+ }
+
+ private record MessagesEntry(Set<String> messageSet, Set<MessageId>
messageIdSet) {}
+
+}
\ No newline at end of file