dragosvictor commented on code in PR #21682:
URL: https://github.com/apache/pulsar/pull/21682#discussion_r1419798231
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -308,7 +309,7 @@ public synchronized void removeConsumer(Consumer consumer,
boolean isResetCursor
if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the
subscription as well
- this.close().thenRun(() -> {
+ this.close(false).thenRun(() -> {
Review Comment:
Would be redundant, since we already know that there are no consumers in
this code path.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -542,6 +549,10 @@ public SubType getType() {
@Override
public final synchronized void readEntriesComplete(List<Entry> entries,
Object ctx) {
+ if (topic.isTransferring()) {
Review Comment:
Removed the conditional, as we can rely on the duplicate in
`readMoreEntries`. Also, the sending of the messages themselves is prevented by
the check in `trySendMessagesToConsumers`:
https://github.com/apache/pulsar/blob/68731c11142a272895459dcb96070ac14b3319d8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L684-L687
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java:
##########
@@ -147,12 +151,13 @@ public synchronized void consumerFlow(Consumer consumer,
int additionalNumberOfM
}
@Override
- public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean
isResetCursor) {
+ public synchronized CompletableFuture<Void> disconnectAllConsumers(
+ boolean isResetCursor, Optional<BrokerLookupData>
assignedBrokerLookupData) {
closeFuture = new CompletableFuture<>();
if (consumerList.isEmpty()) {
closeFuture.complete(null);
} else {
- consumerList.forEach(Consumer::disconnect);
Review Comment:
It's a miss, for sure, but the code is not using the flag anyway, as per the
call graph:
https://github.com/apache/pulsar/blob/68731c11142a272895459dcb96070ac14b3319d8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java#L160
https://github.com/apache/pulsar/blob/117df782bc4c47657ee1e51f827472721e7685a2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L418
https://github.com/apache/pulsar/blob/117df782bc4c47657ee1e51f827472721e7685a2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L402
https://github.com/apache/pulsar/blob/94fdee443c9592fb192d8494715aabeb43add95f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java#L173
It's only interpreted in `PersistentSubscription`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -889,53 +890,79 @@ public int getTotalNonContiguousDeletedMessagesRange() {
*
* @return CompletableFuture indicating the completion of delete operation
*/
- @Override
- public CompletableFuture<Void> close() {
- synchronized (this) {
- if (dispatcher != null && dispatcher.isConsumerConnected()) {
- return FutureUtil.failedFuture(new
SubscriptionBusyException("Subscription has active consumers"));
- }
- return this.pendingAckHandle.closeAsync().thenAccept(v -> {
- IS_FENCED_UPDATER.set(this, TRUE);
- log.info("[{}][{}] Successfully closed subscription [{}]",
topicName, subName, cursor);
- });
+ private synchronized CompletableFuture<Void> close(boolean
checkActiveConsumers) {
+ if (checkActiveConsumers && dispatcher != null &&
dispatcher.isConsumerConnected()) {
+ return FutureUtil.failedFuture(new
SubscriptionBusyException("Subscription has active consumers"));
}
+ return this.pendingAckHandle.closeAsync().thenAccept(v -> {
+ IS_FENCED_UPDATER.set(this, TRUE);
+ log.info("[{}][{}] Successfully closed subscription [{}]",
topicName, subName, cursor);
+ });
+ }
+
+
+ /**
+ * Disconnect all consumers from this subscription.
+ *
+ * @return CompletableFuture indicating the completion of the operation.
+ */
+ @Override
+ public synchronized CompletableFuture<Void>
disconnect(Optional<BrokerLookupData> assignedBrokerLookupData) {
+ CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
+
+ (dispatcher != null
+ ? dispatcher.disconnectAllConsumers(false,
assignedBrokerLookupData)
+ : CompletableFuture.completedFuture(null))
+ .thenRun(() -> {
+ log.info("[{}][{}] Successfully disconnected subscription
consumers", topicName, subName);
+ disconnectFuture.complete(null);
+ }).exceptionally(exception -> {
+ log.error("[{}][{}] Error disconnecting subscription
consumers", topicName, subName, exception);
+ disconnectFuture.completeExceptionally(exception);
+ return null;
+ });
+
+ return disconnectFuture;
}
/**
- * Disconnect all consumers attached to the dispatcher and close this
subscription.
+ * Fence this subscription and optionally disconnect all consumers.
*
- * @return CompletableFuture indicating the completion of disconnect
operation
+ * @return CompletableFuture indicating the completion of the operation.
*/
@Override
- public synchronized CompletableFuture<Void> disconnect() {
- if (fenceFuture != null){
+ public synchronized CompletableFuture<Void> close(boolean
disconnectConsumers,
+
Optional<BrokerLookupData> assignedBrokerLookupData) {
+ if (fenceFuture != null) {
return fenceFuture;
}
+
fenceFuture = new CompletableFuture<>();
// block any further consumers on this subscription
IS_FENCED_UPDATER.set(this, TRUE);
- (dispatcher != null ? dispatcher.close() :
CompletableFuture.completedFuture(null))
- .thenCompose(v -> close()).thenRun(() -> {
- log.info("[{}][{}] Successfully disconnected and closed
subscription", topicName, subName);
+ (dispatcher != null
+ ? dispatcher.close(disconnectConsumers,
assignedBrokerLookupData)
+ : CompletableFuture.completedFuture(null))
+ .thenCompose(v -> close(false)).thenRun(() -> {
Review Comment:
Similarly here, would be redundant, since we already disconnected the
consumers if we wanted to, via flag `disconnectConsumers`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]