rdhabalia closed pull request #1647: Fix: remove local-cluster from replication
list of global-namespace should clean topics
URL: https://github.com/apache/incubator-pulsar/pull/1647
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c34ec84ba8..f42525487b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -661,6 +661,21 @@ void removeSubscription(String subscriptionName) {
return delete(false);
}
+ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+ return delete(failIfHasSubscriptions, false);
+ }
+
+ /**
+ * Forcefully close all producers/consumers/replicators and deletes the
topic. this function is used when local
+ * cluster is removed from global-namespace replication list. Because
broker doesn't allow lookup if local cluster
+ * is not part of replication cluster list.
+ *
+ * @return
+ */
+ private CompletableFuture<Void> deleteForcefully() {
+ return delete(false, true);
+ }
+
/**
* Delete the managed ledger associated with this topic
*
@@ -668,11 +683,14 @@ void removeSubscription(String subscriptionName) {
* Flag indicating whether delete should succeed if topic still
has unconnected subscriptions. Set to
* false when called from admin API (it will delete the subs
too), and set to true when called from GC
* thread
- *
+ * @param closeIfClientsConnected
+ * Flag indicate whether explicitly close connected
producers/consumers/replicators before trying to delete topic. If
+ * any client is connected to a topic and if this flag is
disable then this operation fails.
+ *
* @return Completable future indicating completion of delete operation
Completed exceptionally with:
* IllegalStateException if topic is still active
ManagedLedgerException if ledger delete operation fails
*/
- private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean closeIfClientsConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
lock.writeLock().lock();
@@ -682,48 +700,73 @@ void removeSubscription(String subscriptionName) {
deleteFuture.completeExceptionally(new
TopicFencedException("Topic is already fenced"));
return deleteFuture;
}
- if (USAGE_COUNT_UPDATER.get(this) == 0) {
- isFenced = true;
-
+
+ CompletableFuture<Void> closeClientFuture = new
CompletableFuture<>();
+ if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ replicators.forEach((cluster, replicator) ->
futures.add(replicator.disconnect()));
+ producers.forEach(producer ->
futures.add(producer.disconnect()));
+ subscriptions.forEach((s, sub) ->
futures.add(sub.disconnect()));
+ FutureUtil.waitForAll(futures).thenRun(() -> {
+ closeClientFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("[{}] Error closing clients", topic, ex);
+ isFenced = false;
+ closeClientFuture.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ closeClientFuture.complete(null);
+ }
- if (failIfHasSubscriptions) {
- if (!subscriptions.isEmpty()) {
- isFenced = false;
- deleteFuture.completeExceptionally(new
TopicBusyException("Topic has subscriptions"));
- return deleteFuture;
- }
- } else {
- subscriptions.forEach((s, sub) ->
futures.add(sub.delete()));
- }
+ closeClientFuture.thenAccept(delete -> {
+ if (USAGE_COUNT_UPDATER.get(this) == 0) {
+ isFenced = true;
- FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic", topic, ex);
- isFenced = false;
- deleteFuture.completeExceptionally(ex);
- } else {
- ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- brokerService.removeTopicFromCache(topic);
- log.info("[{}] Topic deleted", topic);
- deleteFuture.complete(null);
- }
+ List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- @Override
- public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- isFenced = false;
- log.error("[{}] Error deleting topic", topic,
exception);
- deleteFuture.completeExceptionally(new
PersistenceException(exception));
- }
- }, null);
+ if (failIfHasSubscriptions) {
+ if (!subscriptions.isEmpty()) {
+ isFenced = false;
+ deleteFuture.completeExceptionally(new
TopicBusyException("Topic has subscriptions"));
+ return;
+ }
+ } else {
+ subscriptions.forEach((s, sub) ->
futures.add(sub.delete()));
}
- });
- } else {
+
+ FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic", topic, ex);
+ isFenced = false;
+ deleteFuture.completeExceptionally(ex);
+ } else {
+ ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ brokerService.removeTopicFromCache(topic);
+ log.info("[{}] Topic deleted", topic);
+ deleteFuture.complete(null);
+ }
+
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ isFenced = false;
+ log.error("[{}] Error deleting topic",
topic, exception);
+ deleteFuture.completeExceptionally(new
PersistenceException(exception));
+ }
+ }, null);
+ }
+ });
+ } else {
+ deleteFuture.completeExceptionally(new TopicBusyException(
+ "Topic has " + USAGE_COUNT_UPDATER.get(this) + "
connected producers/consumers"));
+ }
+ }).exceptionally(ex->{
deleteFuture.completeExceptionally(
- new TopicBusyException("Topic has " +
USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
- }
+ new TopicBusyException("Failed to close clients before
deleting topic."));
+ return null;
+ });
} finally {
lock.writeLock().unlock();
}
@@ -858,6 +901,14 @@ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
}
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
+
+ // if local cluster is removed from global namespace cluster-list :
then delete topic forcefully because pulsar
+ // doesn't serve global topic without local repl-cluster configured.
+ if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
+ log.info("Deleting topic [{}] because local cluster is not part of
global namespace repl list {}",
+ configuredClusters);
+ return deleteForcefully();
+ }
List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -882,6 +933,7 @@ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
futures.add(removeReplicator(cluster));
}
}
+
});
return FutureUtil.waitForAll(futures);
@@ -962,7 +1014,7 @@ protected boolean addReplicationCluster(String
remoteCluster, PersistentTopic pe
CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
-
+
String name = PersistentReplicator.getReplicatorName(replicatorPrefix,
remoteCluster);
replicators.get(remoteCluster).disconnect().thenRun(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
new file mode 100644
index 0000000000..adf5ce7297
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
+
+ protected String methodName;
+
+ @BeforeMethod
+ public void beforeMethod(Method m) throws Exception {
+ methodName = m.getName();
+ }
+
+ @Override
+ @BeforeClass(timeOut = 30000)
+ void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(timeOut = 30000)
+ void shutdown() throws Exception {
+ super.shutdown();
+ }
+
+ @DataProvider(name = "partitionedTopic")
+ public Object[][] partitionedTopicProvider() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ /**
+ * If local cluster is removed from the global namespace then all topics
under that namespace should be deleted from
+ * the cluster.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
+ log.info("--- Starting
ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
+
+ final String namespace = "pulsar/global/removeClusterTest";
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2", "r3"));
+
+ final String topicName = "persistent://" + namespace + "/topic";
+
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+ PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>)
client1.newProducer().topic(topicName)
+
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+ ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
client1.newConsumer().topic(topicName)
+ .subscriptionName("sub1").subscribe();
+ ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>)
client2.newConsumer().topic(topicName)
+ .subscriptionName("sub1").subscribe();
+
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r2", "r3"));
+
+ MockedPulsarServiceBaseTest
+ .retryStrategically((test) ->
!pulsar1.getBrokerService().getTopics().containsKey(topicName), 5, 150);
+
+
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+ Assert.assertFalse(producer1.isConnected());
+ Assert.assertFalse(consumer1.isConnected());
+ Assert.assertTrue(consumer2.isConnected());
+
+ client1.close();
+ client2.close();
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services