rdhabalia closed pull request #1647: Fix: remove local-cluster from replication 
list of global-namespace should clean topics
URL: https://github.com/apache/incubator-pulsar/pull/1647
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c34ec84ba8..f42525487b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -661,6 +661,21 @@ void removeSubscription(String subscriptionName) {
         return delete(false);
     }
 
+    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+        return delete(failIfHasSubscriptions, false);
+    }
+
+    /**
+     * Forcefully close all producers/consumers/replicators and deletes the 
topic. this function is used when local
+     * cluster is removed from global-namespace replication list. Because 
broker doesn't allow lookup if local cluster
+     * is not part of replication cluster list.
+     * 
+     * @return
+     */
+    private CompletableFuture<Void> deleteForcefully() {
+        return delete(false, true);
+    }
+    
     /**
      * Delete the managed ledger associated with this topic
      *
@@ -668,11 +683,14 @@ void removeSubscription(String subscriptionName) {
      *            Flag indicating whether delete should succeed if topic still 
has unconnected subscriptions. Set to
      *            false when called from admin API (it will delete the subs 
too), and set to true when called from GC
      *            thread
-     *
+     * @param closeIfClientsConnected
+     *            Flag indicate whether explicitly close connected 
producers/consumers/replicators before trying to delete topic. If
+     *            any client is connected to a topic and if this flag is 
disable then this operation fails.
+     * 
      * @return Completable future indicating completion of delete operation 
Completed exceptionally with:
      *         IllegalStateException if topic is still active 
ManagedLedgerException if ledger delete operation fails
      */
-    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, 
boolean closeIfClientsConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
@@ -682,48 +700,73 @@ void removeSubscription(String subscriptionName) {
                 deleteFuture.completeExceptionally(new 
TopicFencedException("Topic is already fenced"));
                 return deleteFuture;
             }
-            if (USAGE_COUNT_UPDATER.get(this) == 0) {
-                isFenced = true;
-
+            
+            CompletableFuture<Void> closeClientFuture = new 
CompletableFuture<>();
+            if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
+                producers.forEach(producer -> 
futures.add(producer.disconnect()));
+                subscriptions.forEach((s, sub) -> 
futures.add(sub.disconnect()));
+                FutureUtil.waitForAll(futures).thenRun(() -> {
+                    closeClientFuture.complete(null);
+                }).exceptionally(ex -> {
+                    log.error("[{}] Error closing clients", topic, ex);
+                    isFenced = false;
+                    closeClientFuture.completeExceptionally(ex);
+                    return null;
+                });
+            } else {
+                closeClientFuture.complete(null);
+            }
 
-                if (failIfHasSubscriptions) {
-                    if (!subscriptions.isEmpty()) {
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(new 
TopicBusyException("Topic has subscriptions"));
-                        return deleteFuture;
-                    }
-                } else {
-                    subscriptions.forEach((s, sub) -> 
futures.add(sub.delete()));
-                }
+            closeClientFuture.thenAccept(delete -> {
+                if (USAGE_COUNT_UPDATER.get(this) == 0) {
+                    isFenced = true;
 
-                FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Error deleting topic", topic, ex);
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(ex);
-                    } else {
-                        ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                            @Override
-                            public void deleteLedgerComplete(Object ctx) {
-                                brokerService.removeTopicFromCache(topic);
-                                log.info("[{}] Topic deleted", topic);
-                                deleteFuture.complete(null);
-                            }
+                    List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
 
-                            @Override
-                            public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                isFenced = false;
-                                log.error("[{}] Error deleting topic", topic, 
exception);
-                                deleteFuture.completeExceptionally(new 
PersistenceException(exception));
-                            }
-                        }, null);
+                    if (failIfHasSubscriptions) {
+                        if (!subscriptions.isEmpty()) {
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(new 
TopicBusyException("Topic has subscriptions"));
+                            return;
+                        }
+                    } else {
+                        subscriptions.forEach((s, sub) -> 
futures.add(sub.delete()));
                     }
-                });
-            } else {
+
+                    FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+                        if (ex != null) {
+                            log.error("[{}] Error deleting topic", topic, ex);
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(ex);
+                        } else {
+                            ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                @Override
+                                public void deleteLedgerComplete(Object ctx) {
+                                    brokerService.removeTopicFromCache(topic);
+                                    log.info("[{}] Topic deleted", topic);
+                                    deleteFuture.complete(null);
+                                }
+
+                                @Override
+                                public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                    isFenced = false;
+                                    log.error("[{}] Error deleting topic", 
topic, exception);
+                                    deleteFuture.completeExceptionally(new 
PersistenceException(exception));
+                                }
+                            }, null);
+                        }
+                    });
+                } else {
+                    deleteFuture.completeExceptionally(new TopicBusyException(
+                            "Topic has " + USAGE_COUNT_UPDATER.get(this) + " 
connected producers/consumers"));
+                }
+            }).exceptionally(ex->{
                 deleteFuture.completeExceptionally(
-                        new TopicBusyException("Topic has " + 
USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
-            }
+                        new TopicBusyException("Failed to close clients before 
deleting topic."));
+                return null;
+            });
         } finally {
             lock.writeLock().unlock();
         }
@@ -858,6 +901,14 @@ public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
         }
 
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
+        
+        // if local cluster is removed from global namespace cluster-list : 
then delete topic forcefully because pulsar
+        // doesn't serve global topic without local repl-cluster configured.
+        if (TopicName.get(topic).isGlobal() && 
!configuredClusters.contains(localCluster)) {
+            log.info("Deleting topic [{}] because local cluster is not part of 
global namespace repl list {}",
+                    configuredClusters);
+            return deleteForcefully();
+        }
 
         List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -882,6 +933,7 @@ public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
                     futures.add(removeReplicator(cluster));
                 }
             }
+            
         });
 
         return FutureUtil.waitForAll(futures);
@@ -962,7 +1014,7 @@ protected boolean addReplicationCluster(String 
remoteCluster, PersistentTopic pe
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
-
+        
         String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
 
         replicators.get(remoteCluster).disconnect().thenRun(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
new file mode 100644
index 0000000000..adf5ce7297
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
+
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    @Override
+    @BeforeClass(timeOut = 30000)
+    void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(timeOut = 30000)
+    void shutdown() throws Exception {
+        super.shutdown();
+    }
+
+    @DataProvider(name = "partitionedTopic")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    /**
+     * If local cluster is removed from the global namespace then all topics 
under that namespace should be deleted from
+     * the cluster.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
+        log.info("--- Starting 
ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
+
+        final String namespace = "pulsar/global/removeClusterTest";
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
+
+        final String topicName = "persistent://" + namespace + "/topic";
+
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+
+        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) 
client1.newProducer().topic(topicName)
+                
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
client1.newConsumer().topic(topicName)
+                .subscriptionName("sub1").subscribe();
+        ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) 
client2.newConsumer().topic(topicName)
+                .subscriptionName("sub1").subscribe();
+
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r2", "r3"));
+
+        MockedPulsarServiceBaseTest
+                .retryStrategically((test) -> 
!pulsar1.getBrokerService().getTopics().containsKey(topicName), 5, 150);
+
+        
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+        Assert.assertFalse(producer1.isConnected());
+        Assert.assertFalse(consumer1.isConnected());
+        Assert.assertTrue(consumer2.isConnected());
+
+        client1.close();
+        client2.close();
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to