This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 631b13ad23d [improve][client] PIP-313 Support force unsubscribe using
consumer api (#21687)
631b13ad23d is described below
commit 631b13ad23d7e48c6e82d38f97c23d129062cb7c
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Dec 18 21:23:18 2023 -0800
[improve][client] PIP-313 Support force unsubscribe using consumer api
(#21687)
Co-authored-by: Jiwe Guo <[email protected]>
---
.../org/apache/pulsar/broker/service/Consumer.java | 4 +--
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../apache/pulsar/broker/service/Subscription.java | 2 ++
.../nonpersistent/NonPersistentSubscription.java | 17 ++++++++--
.../service/persistent/PersistentSubscription.java | 20 ++++++++++--
.../client/impl/BrokerClientIntegrationTest.java | 37 ++++++++++++++++++++++
.../org/apache/pulsar/client/api/Consumer.java | 25 +++++++++++++++
.../pulsar/client/api/PulsarClientException.java | 4 +++
.../apache/pulsar/client/impl/ConsumerBase.java | 14 ++++++--
.../apache/pulsar/client/impl/ConsumerImpl.java | 6 ++--
.../client/impl/MultiTopicsConsumerImpl.java | 4 +--
.../apache/pulsar/common/protocol/Commands.java | 5 +--
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
13 files changed, 125 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 5ec76d07feb..83dcd8d6c16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -421,8 +421,8 @@ public class Consumer {
}
}
- public void doUnsubscribe(final long requestId) {
- subscription.doUnsubscribe(this).thenAccept(v -> {
+ public void doUnsubscribe(final long requestId, boolean force) {
+ subscription.doUnsubscribe(this, force).thenAccept(v -> {
log.info("Unsubscribed successfully from {}", subscription);
cnx.removedConsumer(this);
cnx.getCommandSender().sendSuccessResponse(requestId);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2baa55b80e7..9f2b98aeb40 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1958,7 +1958,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Consumer> consumerFuture =
consumers.get(unsubscribe.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
-
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId());
+
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId(),
unsubscribe.isForce());
} else {
commandSender.sendErrorResponse(unsubscribe.getRequestId(),
ServerError.MetadataError,
"Consumer not found");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 6805d197521..61107b7b0db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -75,6 +75,8 @@ public interface Subscription extends MessageExpirer {
CompletableFuture<Void> doUnsubscribe(Consumer consumer);
+ CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean
forcefully);
+
CompletableFuture<Void> clearBacklog();
CompletableFuture<Void> skipMessages(int numMessagesToSkip);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 28ea9f39ac8..92aba6221da 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -429,11 +429,24 @@ public class NonPersistentSubscription extends
AbstractSubscription implements S
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
+ return doUnsubscribe(consumer, false);
+ }
+
+ /**
+ * Handle unsubscribe command from the client API Check with the
dispatcher is this consumer can proceed with
+ * unsubscribe.
+ *
+ * @param consumer consumer object that is initiating the unsubscribe
operation
+ * @param force unsubscribe forcefully by disconnecting consumers and
closing subscription
+ * @return CompletableFuture indicating the completion of ubsubscribe
operation
+ */
+ @Override
+ public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean
force) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
- if (dispatcher.canUnsubscribe(consumer)) {
+ if (force || dispatcher.canUnsubscribe(consumer)) {
consumer.close();
- return delete();
+ return delete(force);
}
future.completeExceptionally(
new ServerMetadataException("Unconnected or shared
consumer attempting to unsubscribe"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 86e3558f550..dc79146110f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1074,11 +1074,27 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
+ return doUnsubscribe(consumer, false);
+ }
+
+ /**
+ * Handle unsubscribe command from the client API Check with the
dispatcher is this consumer can proceed with
+ * unsubscribe.
+ *
+ * @param consumer consumer object that is initiating the unsubscribe
operation
+ * @param force unsubscribe forcefully by disconnecting consumers and
closing subscription
+ * @return CompletableFuture indicating the completion of unsubscribe
operation
+ */
+ @Override
+ public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean
force) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
- if (dispatcher.canUnsubscribe(consumer)) {
+ if (force || dispatcher.canUnsubscribe(consumer)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] unsubscribing forcefully {}-{}",
topicName, subName, consumer.consumerName());
+ }
consumer.close();
- return delete();
+ return delete(force);
}
future.completeExceptionally(
new ServerMetadataException("Unconnected or shared
consumer attempting to unsubscribe"));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 0395c59d583..c2715de986a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -1073,4 +1073,41 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
});
}
+ @Test
+ public void testSharedConsumerUnsubscribe() throws Exception {
+ String topic = "persistent://my-property/my-ns/sharedUnsubscribe";
+ String sub = "my-subscriber-name";
+ @Cleanup
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(sub).subscribe();
+ @Cleanup
+ Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(sub).subscribe();
+ try {
+ consumer1.unsubscribe();
+ fail("should have failed as consumer-2 is already connected");
+ } catch (Exception e) {
+ // Ok
+ }
+
+ consumer1.unsubscribe(true);
+ try {
+ consumer2.unsubscribe(true);
+ } catch (PulsarClientException.NotConnectedException e) {
+ // Ok. consumer-2 is already disconnected with force unsubscription
+ }
+ assertFalse(consumer1.isConnected());
+ assertFalse(consumer2.isConnected());
+ }
+
+ @Test(dataProvider = "subType")
+ public void testUnsubscribeForce(SubscriptionType type) throws Exception {
+ String topic = "persistent://my-property/my-ns/sharedUnsubscribe";
+ String sub = "my-subscriber-name";
+ @Cleanup
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topic).subscriptionType(type)
+ .subscriptionName(sub).subscribe();
+ consumer1.unsubscribe(true);
+ assertFalse(consumer1.isConnected());
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index c67ad08c836..d24d674c018 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -73,6 +73,31 @@ public interface Consumer<T> extends Closeable,
MessageAcknowledger {
*/
CompletableFuture<Void> unsubscribeAsync();
+
+ /**
+ * Unsubscribe the consumer.
+ *
+ * <p>This call blocks until the consumer is unsubscribed.
+ *
+ * <p>Unsubscribing will the subscription to be deleted and all the
+ * data retained can potentially be deleted as well.
+ *
+ * <p>The operation will fail when performed on a shared subscription
+ * where multiple consumers are currently connected.
+ *
+ * @param force forcefully unsubscribe by disconnecting connected
consumers.
+ * @throws PulsarClientException if the operation fails
+ */
+ void unsubscribe(boolean force) throws PulsarClientException;
+
+ /**
+ * Asynchronously unsubscribe the consumer.
+ *
+ * @see Consumer#unsubscribe()
+ * @param force forcefully unsubscribe by disconnecting connected
consumers.
+ * @return {@link CompletableFuture} to track the operation
+ */
+ CompletableFuture<Void> unsubscribeAsync(boolean force);
/**
* Receives a single message.
*
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 9409eefe2e0..007308ec7ab 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -658,6 +658,10 @@ public class PulsarClientException extends IOException {
public NotConnectedException(long sequenceId) {
super("Not connected to broker", sequenceId);
}
+
+ public NotConnectedException(String msg) {
+ super(msg);
+ }
}
/**
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6e27701fcea..4f29c0aa76c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -711,8 +711,13 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public void unsubscribe() throws PulsarClientException {
+ unsubscribe(false);
+ }
+
+ @Override
+ public void unsubscribe(boolean force) throws PulsarClientException {
try {
- unsubscribeAsync().get();
+ unsubscribeAsync(force).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
@@ -722,7 +727,12 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
@Override
- public abstract CompletableFuture<Void> unsubscribeAsync();
+ public CompletableFuture<Void> unsubscribeAsync() {
+ return unsubscribeAsync(false);
+ }
+
+ @Override
+ public abstract CompletableFuture<Void> unsubscribeAsync(boolean force);
@Override
public void close() throws PulsarClientException {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e7be0b2dbd4..b43cd79959c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -404,7 +404,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- public CompletableFuture<Void> unsubscribeAsync() {
+ public CompletableFuture<Void> unsubscribeAsync(boolean force) {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new
PulsarClientException.AlreadyClosedException("Consumer was already closed"));
@@ -413,7 +413,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (isConnected()) {
setState(State.Closing);
long requestId = client.newRequestId();
- ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId,
requestId);
+ ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId,
requestId, force);
ClientCnx cnx = cnx();
cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
closeConsumerTasks();
@@ -433,7 +433,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
});
} else {
unsubscribeFuture.completeExceptionally(
- new PulsarClientException(
+ new PulsarClientException.NotConnectedException(
String.format("The client is not connected to the broker
when unsubscribing the "
+ "subscription %s of the topic %s", subscription,
topicName.toString())));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8a515a9f9b8..6ba3aaaaa46 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -559,7 +559,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
@Override
- public CompletableFuture<Void> unsubscribeAsync() {
+ public CompletableFuture<Void> unsubscribeAsync(boolean force) {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics
Consumer was already closed"));
@@ -568,7 +568,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futureList = consumers.values().stream()
- .map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());
+ .map(c -> c.unsubscribeAsync(force)).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
.thenComposeAsync((r) -> {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e715173be52..34d47e2836b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -698,11 +698,12 @@ public class Commands {
}
}
- public static ByteBuf newUnsubscribe(long consumerId, long requestId) {
+ public static ByteBuf newUnsubscribe(long consumerId, long requestId,
boolean force) {
BaseCommand cmd = localCmd(Type.UNSUBSCRIBE);
cmd.setUnsubscribe()
.setConsumerId(consumerId)
- .setRequestId(requestId);
+ .setRequestId(requestId)
+ .setForce(force);
return serializeWithSize(cmd);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 819c6dfd594..387e4e3ff67 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -607,6 +607,7 @@ message CommandFlow {
message CommandUnsubscribe {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
+ optional bool force = 3 [default = false];
}
// Reset an existing consumer to a particular message id