This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9746ea4 Fix: remove local-cluster from replication list of global-namespace should clean topics (#1647) 9746ea4 is described below commit 9746ea44337044a4c5d4f744d4e84e31ef53d327 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Apr 25 22:57:38 2018 -0700 Fix: remove local-cluster from replication list of global-namespace should clean topics (#1647) --- .../broker/service/persistent/PersistentTopic.java | 130 ++++++++++++++------- .../broker/service/ReplicatorGlobalNSTest.java | 110 +++++++++++++++++ 2 files changed, 201 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c34ec84..f425254 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -661,6 +661,21 @@ public class PersistentTopic implements Topic, AddEntryCallback { return delete(false); } + private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) { + return delete(failIfHasSubscriptions, false); + } + + /** + * Forcefully close all producers/consumers/replicators and deletes the topic. this function is used when local + * cluster is removed from global-namespace replication list. Because broker doesn't allow lookup if local cluster + * is not part of replication cluster list. + * + * @return + */ + private CompletableFuture<Void> deleteForcefully() { + return delete(false, true); + } + /** * Delete the managed ledger associated with this topic * @@ -668,11 +683,14 @@ public class PersistentTopic implements Topic, AddEntryCallback { * Flag indicating whether delete should succeed if topic still has unconnected subscriptions. Set to * false when called from admin API (it will delete the subs too), and set to true when called from GC * thread - * + * @param closeIfClientsConnected + * Flag indicate whether explicitly close connected producers/consumers/replicators before trying to delete topic. If + * any client is connected to a topic and if this flag is disable then this operation fails. + * * @return Completable future indicating completion of delete operation Completed exceptionally with: * IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails */ - private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) { + private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) { CompletableFuture<Void> deleteFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -682,48 +700,73 @@ public class PersistentTopic implements Topic, AddEntryCallback { deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced")); return deleteFuture; } - if (USAGE_COUNT_UPDATER.get(this) == 0) { - isFenced = true; - + + CompletableFuture<Void> closeClientFuture = new CompletableFuture<>(); + if (closeIfClientsConnected) { List<CompletableFuture<Void>> futures = Lists.newArrayList(); + replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + producers.forEach(producer -> futures.add(producer.disconnect())); + subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); + FutureUtil.waitForAll(futures).thenRun(() -> { + closeClientFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}] Error closing clients", topic, ex); + isFenced = false; + closeClientFuture.completeExceptionally(ex); + return null; + }); + } else { + closeClientFuture.complete(null); + } - if (failIfHasSubscriptions) { - if (!subscriptions.isEmpty()) { - isFenced = false; - deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions")); - return deleteFuture; - } - } else { - subscriptions.forEach((s, sub) -> futures.add(sub.delete())); - } + closeClientFuture.thenAccept(delete -> { + if (USAGE_COUNT_UPDATER.get(this) == 0) { + isFenced = true; - FutureUtil.waitForAll(futures).whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Error deleting topic", topic, ex); - isFenced = false; - deleteFuture.completeExceptionally(ex); - } else { - ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(topic); - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } + List<CompletableFuture<Void>> futures = Lists.newArrayList(); - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - isFenced = false; - log.error("[{}] Error deleting topic", topic, exception); - deleteFuture.completeExceptionally(new PersistenceException(exception)); - } - }, null); + if (failIfHasSubscriptions) { + if (!subscriptions.isEmpty()) { + isFenced = false; + deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions")); + return; + } + } else { + subscriptions.forEach((s, sub) -> futures.add(sub.delete())); } - }); - } else { + + FutureUtil.waitForAll(futures).whenComplete((v, ex) -> { + if (ex != null) { + log.error("[{}] Error deleting topic", topic, ex); + isFenced = false; + deleteFuture.completeExceptionally(ex); + } else { + ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + brokerService.removeTopicFromCache(topic); + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + isFenced = false; + log.error("[{}] Error deleting topic", topic, exception); + deleteFuture.completeExceptionally(new PersistenceException(exception)); + } + }, null); + } + }); + } else { + deleteFuture.completeExceptionally(new TopicBusyException( + "Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers")); + } + }).exceptionally(ex->{ deleteFuture.completeExceptionally( - new TopicBusyException("Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers")); - } + new TopicBusyException("Failed to close clients before deleting topic.")); + return null; + }); } finally { lock.writeLock().unlock(); } @@ -858,6 +901,14 @@ public class PersistentTopic implements Topic, AddEntryCallback { } String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + + // if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar + // doesn't serve global topic without local repl-cluster configured. + if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) { + log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}", + configuredClusters); + return deleteForcefully(); + } List<CompletableFuture<Void>> futures = Lists.newArrayList(); @@ -882,6 +933,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { futures.add(removeReplicator(cluster)); } } + }); return FutureUtil.waitForAll(futures); @@ -962,7 +1014,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { CompletableFuture<Void> removeReplicator(String remoteCluster) { log.info("[{}] Removing replicator to {}", topic, remoteCluster); final CompletableFuture<Void> future = new CompletableFuture<>(); - + String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); replicators.get(remoteCluster).disconnect().thenRun(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java new file mode 100644 index 0000000..adf5ce7 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; + +public class ReplicatorGlobalNSTest extends ReplicatorTestBase { + + protected String methodName; + + @BeforeMethod + public void beforeMethod(Method m) throws Exception { + methodName = m.getName(); + } + + @Override + @BeforeClass(timeOut = 30000) + void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(timeOut = 30000) + void shutdown() throws Exception { + super.shutdown(); + } + + @DataProvider(name = "partitionedTopic") + public Object[][] partitionedTopicProvider() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + + /** + * If local cluster is removed from the global namespace then all topics under that namespace should be deleted from + * the cluster. + * + * @throws Exception + */ + @Test + public void testRemoveLocalClusterOnGlobalNamespace() throws Exception { + log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---"); + + final String namespace = "pulsar/global/removeClusterTest"; + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); + + final String topicName = "persistent://" + namespace + "/topic"; + + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName) + .enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) client1.newConsumer().topic(topicName) + .subscriptionName("sub1").subscribe(); + ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) client2.newConsumer().topic(topicName) + .subscriptionName("sub1").subscribe(); + + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r2", "r3")); + + MockedPulsarServiceBaseTest + .retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 5, 150); + + Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName)); + Assert.assertFalse(producer1.isConnected()); + Assert.assertFalse(consumer1.isConnected()); + Assert.assertTrue(consumer2.isConnected()); + + client1.close(); + client2.close(); + } + + private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class); + +} -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.