This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 8de67dc1dd6 [fix][broker] fix delete_when_subscriptions_caught_up
doesn't work while have active consumers (#18320)
8de67dc1dd6 is described below
commit 8de67dc1dd625e4e73673df20732b149eccee26f
Author: Penghui Li <[email protected]>
AuthorDate: Sat Nov 5 08:45:43 2022 +0800
[fix][broker] fix delete_when_subscriptions_caught_up doesn't work while
have active consumers (#18320)
---
.github/workflows/ci-cpp-build.yaml | 1 +
.github/workflows/ci-cpp.yaml | 1 +
.github/workflows/pulsar-ci-flaky.yaml | 1 +
.github/workflows/pulsar-ci.yaml | 1 +
.../broker/service/persistent/PersistentTopic.java | 179 +++++++++++----------
.../broker/service/InactiveTopicDeleteTest.java | 2 +-
.../client/api/SimpleProducerConsumerStatTest.java | 1 +
.../pulsar/client/api/MultiTopicConsumerStats.java | 2 +-
.../client/api/PartitionedTopicProducerStats.java | 2 +-
.../impl/MultiTopicConsumerStatsRecorderImpl.java | 2 +-
.../PartitionedTopicProducerStatsRecorderImpl.java | 2 +-
11 files changed, 104 insertions(+), 90 deletions(-)
diff --git a/.github/workflows/ci-cpp-build.yaml
b/.github/workflows/ci-cpp-build.yaml
index 7a450efe972..0d16a1b4cd1 100644
--- a/.github/workflows/ci-cpp-build.yaml
+++ b/.github/workflows/ci-cpp-build.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-2.11
paths:
- '.github/workflows/**'
- 'pulsar-client-cpp/**'
diff --git a/.github/workflows/ci-cpp.yaml b/.github/workflows/ci-cpp.yaml
index 135c41474f5..ddd3ea68b9b 100644
--- a/.github/workflows/ci-cpp.yaml
+++ b/.github/workflows/ci-cpp.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-2.11
workflow_dispatch:
concurrency:
diff --git a/.github/workflows/pulsar-ci-flaky.yaml
b/.github/workflows/pulsar-ci-flaky.yaml
index 03e3adff33a..f4a4609cb4c 100644
--- a/.github/workflows/pulsar-ci-flaky.yaml
+++ b/.github/workflows/pulsar-ci-flaky.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-2.11
workflow_dispatch:
concurrency:
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index e7cf347c06c..b0b5d63349c 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -22,6 +22,7 @@ on:
pull_request:
branches:
- master
+ - branch-2.11
workflow_dispatch:
concurrency:
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5114ddcee54..866cb3d5cc0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1111,6 +1111,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
* Flag indicating whether delete should succeed if topic still
has unconnected subscriptions. Set to
* false when called from admin API (it will delete the subs
too), and set to true when called from GC
* thread
+ * @param failIfHasBacklogs
+ * Flag indicating whether delete should succeed if topic has
backlogs. Set to false when called from
+ * admin API (it will delete the subs too), and set to true
when called from GC thread
* @param closeIfClientsConnected
* Flag indicate whether explicitly close connected
* producers/consumers/replicators before trying to delete
topic.
@@ -1129,102 +1132,108 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted",
topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic
is already fenced"));
- } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
- return FutureUtil.failedFuture(
- new TopicBusyException("Topic has subscriptions: " +
subscriptions.keys()));
- } else if (failIfHasBacklogs && hasBacklogs()) {
- List<String> backlogSubs =
- subscriptions.values().stream()
- .filter(sub ->
sub.getNumberOfEntriesInBacklog(false) > 0)
- .map(PersistentSubscription::getName).toList();
- return FutureUtil.failedFuture(
- new TopicBusyException("Topic has subscriptions did
not catch up: " + backlogSubs));
+ }
+ // We can proceed with the deletion if either:
+ // 1. No one is connected and no subscriptions
+ // 2. The topic have subscriptions but no backlogs for all
subscriptions
+ // if delete_when_no_subscriptions is applied
+ // 3. We want to kick out everyone and forcefully delete the
topic.
+ // In this case, we shouldn't care if the usageCount is 0 or
not, just proceed
+ if (!closeIfClientsConnected) {
+ if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has subscriptions: "
+ subscriptions.keys()));
+ } else if (failIfHasBacklogs) {
+ if (hasBacklogs()) {
+ List<String> backlogSubs =
+ subscriptions.values().stream()
+ .filter(sub ->
sub.getNumberOfEntriesInBacklog(false) > 0)
+
.map(PersistentSubscription::getName).toList();
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has
subscriptions did not catch up: " + backlogSubs));
+ } else if (!producers.isEmpty()) {
+ return FutureUtil.failedFuture(new TopicBusyException(
+ "Topic has " + producers.size() + " connected
producers"));
+ }
+ } else if (currentUsageCount() > 0) {
+ return FutureUtil.failedFuture(new TopicBusyException(
+ "Topic has " + currentUsageCount() + " connected
producers/consumers"));
+ }
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while
deleting
CompletableFuture<Void> closeClientFuture = new
CompletableFuture<>();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (closeIfClientsConnected) {
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) ->
futures.add(replicator.disconnect()));
producers.values().forEach(producer ->
futures.add(producer.disconnect()));
- subscriptions.forEach((s, sub) ->
futures.add(sub.disconnect()));
- FutureUtil.waitForAll(futures).thenRun(() -> {
- closeClientFuture.complete(null);
- }).exceptionally(ex -> {
- log.error("[{}] Error closing clients", topic, ex);
- unfenceTopicToResume();
- closeClientFuture.completeExceptionally(ex);
- return null;
- });
- } else {
- closeClientFuture.complete(null);
}
+ FutureUtil.waitForAll(futures).thenRun(() -> {
+ closeClientFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("[{}] Error closing clients", topic, ex);
+ unfenceTopicToResume();
+ closeClientFuture.completeExceptionally(ex);
+ return null;
+ });
closeClientFuture.thenAccept(delete -> {
- // We can proceed with the deletion if either:
- // 1. No one is connected
- // 2. We want to kick out everyone and forcefully delete the
topic.
- // In this case, we shouldn't care if the usageCount is 0
or not, just proceed
- if (currentUsageCount() == 0 || (closeIfClientsConnected &&
!failIfHasSubscriptions)) {
- CompletableFuture<Void> deleteTopicAuthenticationFuture =
new CompletableFuture<>();
- brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
-
- deleteTopicAuthenticationFuture.thenCompose(__ ->
deleteSchema())
- .thenCompose(__ -> deleteTopicPolicies())
- .thenCompose(__ ->
transactionBufferCleanupAndClose())
- .whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic", topic, ex);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(ex);
- } else {
- List<CompletableFuture<Void>> subsDeleteFutures =
new ArrayList<>();
- subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
-
-
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
- if (e != null) {
- log.error("[{}] Error deleting topic",
topic, e);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(e);
- } else {
- ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void
deleteLedgerComplete(Object ctx) {
-
brokerService.removeTopicFromCache(PersistentTopic.this);
-
-
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
- unregisterTopicPolicyListener();
-
- log.info("[{}] Topic deleted",
topic);
- deleteFuture.complete(null);
- }
-
- @Override
- public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- if (exception.getCause()
- instanceof
MetadataStoreException.NotFoundException) {
- log.info("[{}] Topic is
already deleted {}",
- topic,
exception.getMessage());
- deleteLedgerComplete(ctx);
- } else {
- unfenceTopicToResume();
- log.error("[{}] Error deleting
topic", topic, exception);
-
deleteFuture.completeExceptionally(new PersistenceException(exception));
+ CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
+ brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
+ deleteTopicAuthenticationFuture.thenCompose(__ ->
deleteSchema())
+ .thenCompose(__ -> deleteTopicPolicies())
+ .thenCompose(__ -> transactionBufferCleanupAndClose())
+ .whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic", topic,
ex);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(ex);
+ } else {
+ List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
+ subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
+
+
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+ if (e != null) {
+ log.error("[{}] Error deleting topic",
topic, e);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(e);
+ } else {
+ ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void
deleteLedgerComplete(Object ctx) {
+
brokerService.removeTopicFromCache(PersistentTopic.this);
+
+
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+
unregisterTopicPolicyListener();
+
+ log.info("[{}] Topic deleted",
topic);
+ deleteFuture.complete(null);
}
- }
- }, null);
- }
- });
- }
- });
- } else {
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(new TopicBusyException(
- "Topic has " + currentUsageCount() + " connected
producers/consumers"));
- }
+
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception,
+
Object ctx) {
+ if (exception.getCause()
+ instanceof
MetadataStoreException.NotFoundException) {
+ log.info("[{}] Topic is
already deleted {}",
+ topic,
exception.getMessage());
+ deleteLedgerComplete(ctx);
+ } else {
+ unfenceTopicToResume();
+ log.error("[{}] Error
deleting topic", topic, exception);
+
deleteFuture.completeExceptionally(
+ new
PersistenceException(exception));
+ }
+ }
+ }, null);
+ }
+ });
+ }
+ });
}).exceptionally(ex->{
unfenceTopicToResume();
deleteFuture.completeExceptionally(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index 4a372c31b15..25089fdfdbd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -328,7 +328,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
producer.send("Pulsar".getBytes());
}
- consumer.close();
producer.close();
Thread.sleep(2000);
@@ -338,6 +337,7 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
admin.topics().skipAllMessages(topic, "sub");
Awaitility.await()
.untilAsserted(() ->
Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
+ consumer.close();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 1c02f2ea57e..0adeca4bcdb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
index e1c1d3372c3..552f0954c19 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
index 5fd3c1f34a0..70c11a06dfb 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
index 17018be02be..94a93fc1db1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
index 2f73a6af406..d396e7e693a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information