This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1b0454614b [fix][broker]Delete subscription and disconnect
replicators after topic migration (#21029)
c1b0454614b is described below
commit c1b0454614b7903913cb0311bdcacf2118893fc9
Author: vineeth1995 <[email protected]>
AuthorDate: Tue Aug 22 09:52:30 2023 -0700
[fix][broker]Delete subscription and disconnect replicators after topic
migration (#21029)
Co-authored-by: Vineeth Polamreddy <[email protected]>
---
.../service/nonpersistent/NonPersistentTopic.java | 21 ++++++++++
.../broker/service/persistent/PersistentTopic.java | 47 +++++++++++++++++-----
.../broker/service/ClusterMigrationTest.java | 9 +++++
3 files changed, 66 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c764283cb44..836e5655168 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -959,10 +959,31 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
consumer.topicMigrated(url);
});
});
+ return disconnectReplicators().thenCompose(__ ->
checkAndUnsubscribeSubscriptions());
}
return CompletableFuture.completedFuture(null);
}
+ private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ subscriptions.forEach((s, subscription) -> {
+ if (subscription.getConsumers().isEmpty()) {
+ futures.add(subscription.delete());
+ }
+ });
+
+ return FutureUtil.waitForAll(futures);
+ }
+
+ private CompletableFuture<Void> disconnectReplicators() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators =
getReplicators();
+ replicators.forEach((r, replicator) -> {
+ futures.add(replicator.disconnect());
+ });
+ return FutureUtil.waitForAll(futures);
+ }
+
@Override
public void checkGC() {
if (!isDeleteWhileInactive()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 57a4989b4d3..f5679665d46 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -180,6 +180,7 @@ import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class PersistentTopic extends AbstractTopic implements Topic,
AddEntryCallback {
// Managed ledger associated with the topic
@@ -2575,25 +2576,49 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();
- if (!isMigrated() && clusterUrl.isPresent()) {
- return ledger.asyncMigrate().thenApply(__ -> {
- subscriptions.forEach((name, sub) -> {
- if (sub.isSubsciptionMigrated()) {
-
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
- }
- });
- return null;
- });
- } else {
+ if (!clusterUrl.isPresent()) {
return CompletableFuture.completedFuture(null);
}
+ CompletableFuture<?> migrated = !isMigrated() ? ledger.asyncMigrate() :
+ CompletableFuture.completedFuture(null);
+ return migrated.thenApply(__ -> {
+ subscriptions.forEach((name, sub) -> {
+ if (sub.isSubsciptionMigrated()) {
+
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
+ }
+ });
+ return null;
+ }).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__
-> checkAndUnsubscribeSubscriptions());
+ }
+
+ private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ subscriptions.forEach((s, subscription) -> {
+ if (subscription.getNumberOfEntriesInBacklog(true) == 0
+ && subscription.getConsumers().isEmpty()) {
+ futures.add(subscription.delete());
+ }
+ });
+
+ return FutureUtil.waitForAll(futures);
+ }
+
+ private CompletableFuture<Void> checkAndDisconnectReplicators() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ ConcurrentOpenHashMap<String, Replicator> replicators =
getReplicators();
+ replicators.forEach((r, replicator) -> {
+ if (replicator.getNumberOfEntriesInBacklog() <= 0) {
+ futures.add(replicator.disconnect());
+ }
+ });
+ return FutureUtil.waitForAll(futures);
}
public boolean isReplicationBacklogExist() {
ConcurrentOpenHashMap<String, Replicator> replicators =
getReplicators();
if (replicators != null) {
for (Replicator replicator : replicators.values()) {
- if (replicator.getNumberOfEntriesInBacklog() != 0) {
+ if (replicator.getNumberOfEntriesInBacklog() > 0) {
return true;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index c1807a15661..980a2c01d95 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -312,6 +313,14 @@ public class ClusterMigrationTest {
retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10,
500);
assertFalse(topic2.getSubscriptions().isEmpty());
+ topic1.checkClusterMigration().get();
+ ConcurrentOpenHashMap<String, ? extends Replicator> replicators =
topic1.getReplicators();
+ replicators.forEach((r, replicator) -> {
+ assertFalse(replicator.isConnected());
+ });
+
+ assertTrue(topic1.getSubscriptions().isEmpty());
+
// not also create a new consumer which should also reconnect to
cluster-2
Consumer<byte[]> consumer2 =
client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s2").subscribe();