congbobo184 commented on code in PR #16247:
URL: https://github.com/apache/pulsar/pull/16247#discussion_r957967551
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1953,15 +1953,50 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}
- public CompletableFuture<Void> removeTopicFromCache(String topic) {
+ public CompletableFuture<Void> removeTopicFromCache(String
topicNameString, Topic topic) {
+ if (topic == null){
+ return removeTopicFromCache(topicNameString);
+ }
+ final CompletableFuture<Optional<Topic>> createTopicFuture =
topics.get(topicNameString);
+ // If not exists in cache, do nothing.
+ if (createTopicFuture == null){
+ return CompletableFuture.completedFuture(null);
+ }
+ // We don't need to wait for the future complete, because we already
have the topic reference here.
+ if (!createTopicFuture.isDone()){
+ return CompletableFuture.completedFuture(null);
+ }
+ return createTopicFuture.thenCompose(topicOptional -> {
+ Topic topicInCache = topicOptional.orElse(null);
+ // If @param topic is not equals with cached, do nothing.
+ if (topicInCache == null || topicInCache != topic){
+ return CompletableFuture.completedFuture(null);
+ } else {
+ // Do remove.
+ return removeTopicFromCache(topicNameString,
createTopicFuture);
+ }
+ });
+ }
+
+ public CompletableFuture<Void> removeTopicFromCache(String topic){
+ return removeTopicFromCache(topic, (CompletableFuture) null);
Review Comment:
```suggestion
return removeTopicFromCache(topic,
(CompletableFuture<Optional<Topic>>) null);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
}
/**
- * Close this topic - close all producers and subscriptions associated
with this topic.
- *
+ * Close this topic - close all resources associated with this topic:
+ * 1.clients (producers and consumers) {@link #asyncCloseClients()}.
+ * 2.managed ledger {@link #asyncCloseLedger()}.
+ * 3.limiters {@link #closeLimiters()}.
* @param closeWithoutWaitingClientDisconnect don't wait for client
disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(boolean
closeWithoutWaitingClientDisconnect) {
- CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
lock.writeLock().lock();
try {
// closing managed-ledger waits until all
producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all
resources to be closed.
- if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
- fenceTopicToCloseOrDelete();
+ if (isClosingOrDeleting){
+ if (closeWithoutWaitingClientDisconnect){
+ return this.fullyCloseFuture;
+ } else {
+ // Why not return this.fullyCloseFuture ?
+ // Just keep the same implementation as before.
+ log.warn("[{}] Topic is already being closed or deleted",
topic);
+ return CompletableFuture.failedFuture(new
TopicFencedException("Topic is already fenced"));
+ }
} else {
- log.warn("[{}] Topic is already being closed or deleted",
topic);
- closeFuture.completeExceptionally(new
TopicFencedException("Topic is already fenced"));
- return closeFuture;
+ fenceTopicToCloseOrDelete();
+ this.fullyCloseFuture = new CompletableFuture<>();
}
} finally {
lock.writeLock().unlock();
}
+ // Declare result.
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ // Close limiters.
+ try {
+ closeLimiters();
+ } catch (Exception t){
+ log.error("[{}] Error closing topic", topic, t);
+ unfenceTopicToResumeWithLock();
+ this.fullyCloseFuture.completeExceptionally(t);
+ return CompletableFuture.failedFuture(t);
+ }
+
+ // Close client components.
+ CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
Review Comment:
```suggestion
CompletableFuture<Void> closeClientsFuture =
asyncCloseClients(boolean closeWithoutWaitingClientDisconnect);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
}
/**
- * Close this topic - close all producers and subscriptions associated
with this topic.
- *
+ * Close this topic - close all resources associated with this topic:
+ * 1.clients (producers and consumers) {@link #asyncCloseClients()}.
+ * 2.managed ledger {@link #asyncCloseLedger()}.
+ * 3.limiters {@link #closeLimiters()}.
* @param closeWithoutWaitingClientDisconnect don't wait for client
disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(boolean
closeWithoutWaitingClientDisconnect) {
- CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
lock.writeLock().lock();
try {
// closing managed-ledger waits until all
producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all
resources to be closed.
- if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
- fenceTopicToCloseOrDelete();
+ if (isClosingOrDeleting){
+ if (closeWithoutWaitingClientDisconnect){
+ return this.fullyCloseFuture;
+ } else {
+ // Why not return this.fullyCloseFuture ?
+ // Just keep the same implementation as before.
+ log.warn("[{}] Topic is already being closed or deleted",
topic);
+ return CompletableFuture.failedFuture(new
TopicFencedException("Topic is already fenced"));
+ }
} else {
- log.warn("[{}] Topic is already being closed or deleted",
topic);
- closeFuture.completeExceptionally(new
TopicFencedException("Topic is already fenced"));
- return closeFuture;
+ fenceTopicToCloseOrDelete();
+ this.fullyCloseFuture = new CompletableFuture<>();
}
} finally {
lock.writeLock().unlock();
}
+ // Declare result.
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ // Close limiters.
+ try {
+ closeLimiters();
+ } catch (Exception t){
Review Comment:
why should catch the Exception?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1243,86 +1247,145 @@ public CompletableFuture<Void> close() {
}
/**
- * Close this topic - close all producers and subscriptions associated
with this topic.
- *
+ * Close this topic - close all resources associated with this topic:
+ * 1.clients (producers and consumers) {@link #asyncCloseClients()}.
+ * 2.managed ledger {@link #asyncCloseLedger()}.
+ * 3.limiters {@link #closeLimiters()}.
* @param closeWithoutWaitingClientDisconnect don't wait for client
disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(boolean
closeWithoutWaitingClientDisconnect) {
- CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
lock.writeLock().lock();
try {
// closing managed-ledger waits until all
producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all
resources to be closed.
- if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
- fenceTopicToCloseOrDelete();
+ if (isClosingOrDeleting){
+ if (closeWithoutWaitingClientDisconnect){
+ return this.fullyCloseFuture;
+ } else {
+ // Why not return this.fullyCloseFuture ?
+ // Just keep the same implementation as before.
+ log.warn("[{}] Topic is already being closed or deleted",
topic);
+ return CompletableFuture.failedFuture(new
TopicFencedException("Topic is already fenced"));
+ }
} else {
- log.warn("[{}] Topic is already being closed or deleted",
topic);
- closeFuture.completeExceptionally(new
TopicFencedException("Topic is already fenced"));
- return closeFuture;
+ fenceTopicToCloseOrDelete();
+ this.fullyCloseFuture = new CompletableFuture<>();
}
} finally {
lock.writeLock().unlock();
}
+ // Declare result.
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ // Close limiters.
+ try {
+ closeLimiters();
+ } catch (Exception t){
+ log.error("[{}] Error closing topic", topic, t);
+ unfenceTopicToResumeWithLock();
+ this.fullyCloseFuture.completeExceptionally(t);
+ return CompletableFuture.failedFuture(t);
+ }
+
+ // Close client components.
+ CompletableFuture<Void> closeClientsFuture = asyncCloseClients();
+ // Close managed ledger.
+ CompletableFuture<Void> closePhase2Future;
+ if (closeWithoutWaitingClientDisconnect){
+ closePhase2Future = asyncCloseLedger();
+ } else {
+ closePhase2Future = closeClientsFuture.thenCompose(__ ->
asyncCloseLedger());
+ }
+ // Complete resultFuture. If managed ledger close failure, reset topic
to resume.
+ closePhase2Future.whenComplete((__, ex) -> {
+ if (ex == null){
+ resultFuture.complete(null);
+ } else {
+ log.error("[{}] Error closing topic", topic, ex);
+ // Restart rate-limiter after close managed ledger failure.
Success is not guaranteed.
+ try {
+ restartLimitersAfterCloseTopicFail();
Review Comment:
why should catch the Exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]