This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9746ea4 Fix: remove local-cluster from replication list of
global-namespace should clean topics (#1647)
9746ea4 is described below
commit 9746ea44337044a4c5d4f744d4e84e31ef53d327
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Apr 25 22:57:38 2018 -0700
Fix: remove local-cluster from replication list of global-namespace should
clean topics (#1647)
---
.../broker/service/persistent/PersistentTopic.java | 130 ++++++++++++++-------
.../broker/service/ReplicatorGlobalNSTest.java | 110 +++++++++++++++++
2 files changed, 201 insertions(+), 39 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c34ec84..f425254 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -661,6 +661,21 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
return delete(false);
}
+ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+ return delete(failIfHasSubscriptions, false);
+ }
+
+ /**
+ * Forcefully close all producers/consumers/replicators and deletes the
topic. this function is used when local
+ * cluster is removed from global-namespace replication list. Because
broker doesn't allow lookup if local cluster
+ * is not part of replication cluster list.
+ *
+ * @return
+ */
+ private CompletableFuture<Void> deleteForcefully() {
+ return delete(false, true);
+ }
+
/**
* Delete the managed ledger associated with this topic
*
@@ -668,11 +683,14 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
* Flag indicating whether delete should succeed if topic still
has unconnected subscriptions. Set to
* false when called from admin API (it will delete the subs
too), and set to true when called from GC
* thread
- *
+ * @param closeIfClientsConnected
+ * Flag indicate whether explicitly close connected
producers/consumers/replicators before trying to delete topic. If
+ * any client is connected to a topic and if this flag is
disable then this operation fails.
+ *
* @return Completable future indicating completion of delete operation
Completed exceptionally with:
* IllegalStateException if topic is still active
ManagedLedgerException if ledger delete operation fails
*/
- private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean closeIfClientsConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
lock.writeLock().lock();
@@ -682,48 +700,73 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
deleteFuture.completeExceptionally(new
TopicFencedException("Topic is already fenced"));
return deleteFuture;
}
- if (USAGE_COUNT_UPDATER.get(this) == 0) {
- isFenced = true;
-
+
+ CompletableFuture<Void> closeClientFuture = new
CompletableFuture<>();
+ if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ replicators.forEach((cluster, replicator) ->
futures.add(replicator.disconnect()));
+ producers.forEach(producer ->
futures.add(producer.disconnect()));
+ subscriptions.forEach((s, sub) ->
futures.add(sub.disconnect()));
+ FutureUtil.waitForAll(futures).thenRun(() -> {
+ closeClientFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("[{}] Error closing clients", topic, ex);
+ isFenced = false;
+ closeClientFuture.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ closeClientFuture.complete(null);
+ }
- if (failIfHasSubscriptions) {
- if (!subscriptions.isEmpty()) {
- isFenced = false;
- deleteFuture.completeExceptionally(new
TopicBusyException("Topic has subscriptions"));
- return deleteFuture;
- }
- } else {
- subscriptions.forEach((s, sub) ->
futures.add(sub.delete()));
- }
+ closeClientFuture.thenAccept(delete -> {
+ if (USAGE_COUNT_UPDATER.get(this) == 0) {
+ isFenced = true;
- FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic", topic, ex);
- isFenced = false;
- deleteFuture.completeExceptionally(ex);
- } else {
- ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- brokerService.removeTopicFromCache(topic);
- log.info("[{}] Topic deleted", topic);
- deleteFuture.complete(null);
- }
+ List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- @Override
- public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- isFenced = false;
- log.error("[{}] Error deleting topic", topic,
exception);
- deleteFuture.completeExceptionally(new
PersistenceException(exception));
- }
- }, null);
+ if (failIfHasSubscriptions) {
+ if (!subscriptions.isEmpty()) {
+ isFenced = false;
+ deleteFuture.completeExceptionally(new
TopicBusyException("Topic has subscriptions"));
+ return;
+ }
+ } else {
+ subscriptions.forEach((s, sub) ->
futures.add(sub.delete()));
}
- });
- } else {
+
+ FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic", topic, ex);
+ isFenced = false;
+ deleteFuture.completeExceptionally(ex);
+ } else {
+ ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ brokerService.removeTopicFromCache(topic);
+ log.info("[{}] Topic deleted", topic);
+ deleteFuture.complete(null);
+ }
+
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ isFenced = false;
+ log.error("[{}] Error deleting topic",
topic, exception);
+ deleteFuture.completeExceptionally(new
PersistenceException(exception));
+ }
+ }, null);
+ }
+ });
+ } else {
+ deleteFuture.completeExceptionally(new TopicBusyException(
+ "Topic has " + USAGE_COUNT_UPDATER.get(this) + "
connected producers/consumers"));
+ }
+ }).exceptionally(ex->{
deleteFuture.completeExceptionally(
- new TopicBusyException("Topic has " +
USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
- }
+ new TopicBusyException("Failed to close clients before
deleting topic."));
+ return null;
+ });
} finally {
lock.writeLock().unlock();
}
@@ -858,6 +901,14 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
}
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
+
+ // if local cluster is removed from global namespace cluster-list :
then delete topic forcefully because pulsar
+ // doesn't serve global topic without local repl-cluster configured.
+ if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
+ log.info("Deleting topic [{}] because local cluster is not part of
global namespace repl list {}",
+ configuredClusters);
+ return deleteForcefully();
+ }
List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -882,6 +933,7 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
futures.add(removeReplicator(cluster));
}
}
+
});
return FutureUtil.waitForAll(futures);
@@ -962,7 +1014,7 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
-
+
String name = PersistentReplicator.getReplicatorName(replicatorPrefix,
remoteCluster);
replicators.get(remoteCluster).disconnect().thenRun(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
new file mode 100644
index 0000000..adf5ce7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
+
+ protected String methodName;
+
+ @BeforeMethod
+ public void beforeMethod(Method m) throws Exception {
+ methodName = m.getName();
+ }
+
+ @Override
+ @BeforeClass(timeOut = 30000)
+ void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(timeOut = 30000)
+ void shutdown() throws Exception {
+ super.shutdown();
+ }
+
+ @DataProvider(name = "partitionedTopic")
+ public Object[][] partitionedTopicProvider() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ /**
+ * If local cluster is removed from the global namespace then all topics
under that namespace should be deleted from
+ * the cluster.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
+ log.info("--- Starting
ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
+
+ final String namespace = "pulsar/global/removeClusterTest";
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2", "r3"));
+
+ final String topicName = "persistent://" + namespace + "/topic";
+
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+ PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>)
client1.newProducer().topic(topicName)
+
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+ ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
client1.newConsumer().topic(topicName)
+ .subscriptionName("sub1").subscribe();
+ ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>)
client2.newConsumer().topic(topicName)
+ .subscriptionName("sub1").subscribe();
+
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r2", "r3"));
+
+ MockedPulsarServiceBaseTest
+ .retryStrategically((test) ->
!pulsar1.getBrokerService().getTopics().containsKey(topicName), 5, 150);
+
+
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+ Assert.assertFalse(producer1.isConnected());
+ Assert.assertFalse(consumer1.isConnected());
+ Assert.assertTrue(consumer2.isConnected());
+
+ client1.close();
+ client2.close();
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].