This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6d19109487dad1a4cc8a77b580bacf7f3112ee14
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 24 15:06:00 2024 +0800

    [improve] [broker] Create partitioned topics automatically when enable 
topic level replication (#22537)
    
    (cherry picked from commit d4756557bf4328019dd938a56c3135aecc3147e4)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 104 +++++++++++++++------
 .../broker/admin/impl/PersistentTopicsBase.java    |  24 ++++-
 .../broker/service/OneWayReplicatorTest.java       |  87 ++++++++++++++++-
 .../broker/service/OneWayReplicatorTestBase.java   |  31 +++---
 4 files changed, 196 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index b0d66c27d63..514709b58a9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -43,9 +44,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
@@ -621,35 +624,82 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
         getNamespaceReplicatedClustersAsync(namespaceName)
-                .thenAccept(clusters -> {
-                    for (String cluster : clusters) {
-                        if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-                            // this call happens in the background without 
async composition. completion is logged.
-                            pulsar().getPulsarResources().getClusterResources()
-                                    .getClusterAsync(cluster)
-                                    .thenCompose(clusterDataOp ->
-                                            ((TopicsImpl) 
pulsar().getBrokerService()
-                                                    
.getClusterPulsarAdmin(cluster,
-                                                            
clusterDataOp).topics())
-                                                    
.createPartitionedTopicAsync(
-                                                            
topicName.getPartitionedTopicName(),
-                                                            numPartitions,
-                                                            true, null))
-                                    .whenComplete((__, ex) -> {
-                                        if (ex != null) {
-                                            log.error(
-                                                    "[{}] Failed to create 
partitioned topic {} in cluster {}.",
-                                                    clientAppId(), topicName, 
cluster, ex);
-                                        } else {
-                                            log.info(
-                                                    "[{}] Successfully created 
partitioned topic {} in "
-                                                            + "cluster {}",
-                                                    clientAppId(), topicName, 
cluster);
-                                        }
-                                    });
-                        }
+            .thenAccept(clusters -> {
+                // this call happens in the background without async 
composition. completion is logged.
+                
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+            });
+    }
+
+    protected Map<String, CompletableFuture<Void>> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+            Set<String> clusters, int numPartitions) {
+        final String shortTopicName = topicName.getPartitionedTopicName();
+        Map<String, CompletableFuture<Void>> tasksForAllClusters = new 
HashMap<>();
+        for (String cluster : clusters) {
+            if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+                continue;
+            }
+            ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+            CompletableFuture<Void> createRemoteTopicFuture = new 
CompletableFuture<>();
+            tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+            
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+                if (ex1 != null) {
+                    // Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+                    log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
+                                    + " {}.", clientAppId(), topicName, 
cluster, ex1);
+                    createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+                    return;
+                }
+                // Get cluster data success.
+                TopicsImpl topics =
+                        (TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+                topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+                        .whenComplete((ignore, ex2) -> {
+                    if (ex2 == null) {
+                        // Create success.
+                        log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+                                clientAppId(), topicName, cluster);
+                        createRemoteTopicFuture.complete(null);
+                        return;
+                    }
+                    // Create topic on the remote cluster error.
+                    Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+                    // The topic has been created before, check the partitions 
count is expected.
+                    if (unwrapEx2 instanceof 
PulsarAdminException.ConflictException) {
+                        
topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta,
 ex3) -> {
+                            if (ex3 != null) {
+                                // Unexpected error, such as NPE. Catch all 
error to avoid the
+                                // "createRemoteTopicFuture" stuck.
+                                log.error("[{}] Failed to check 
remote-cluster's topic metadata when creating"
+                                                + " partitioned topic {} in 
cluster {}.",
+                                        clientAppId(), topicName, cluster, 
ex3);
+                                
createRemoteTopicFuture.completeExceptionally(new RestException(ex3));
+                            }
+                            // Call get partitioned metadata of remote cluster 
success.
+                            if (topicMeta.partitions == numPartitions) {
+                                log.info("[{}] Skip created partitioned topic 
{} in cluster {},  because that {}",
+                                        clientAppId(), topicName, cluster, 
unwrapEx2.getMessage());
+                                createRemoteTopicFuture.complete(null);
+                            } else {
+                                String errorMsg = String.format("[%s] There is 
an exists topic %s with different"
+                                                + " partitions %s on the 
remote cluster %s, you want to create it"
+                                                + " with partitions %s",
+                                        clientAppId(), shortTopicName, 
topicMeta.partitions, cluster,
+                                        numPartitions);
+                                log.error(errorMsg);
+                                createRemoteTopicFuture.completeExceptionally(
+                                        new 
RestException(Status.PRECONDITION_FAILED, errorMsg));
+                            }
+                        });
+                    } else {
+                        // An HTTP error was responded from the remote cluster.
+                        log.error("[{}] Failed to create partitioned topic {} 
in cluster {}.",
+                                clientAppId(), topicName, cluster, ex2);
+                        createRemoteTopicFuture.completeExceptionally(new 
RestException(unwrapEx2));
                     }
                 });
+            });
+        }
+        return tasksForAllClusters;
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4b29452f98c..d433ee15164 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3395,14 +3395,14 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<Void> 
internalSetReplicationClusters(List<String> clusterIds) {
-
+        if (CollectionUtils.isEmpty(clusterIds)) {
+            return CompletableFuture.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "ClusterIds should not be null or empty"));
+        }
+        Set<String> replicationClusters = Sets.newHashSet(clusterIds);
         return validateTopicPolicyOperationAsync(topicName, 
PolicyName.REPLICATION, PolicyOperation.WRITE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> {
-                    if (CollectionUtils.isEmpty(clusterIds)) {
-                        throw new RestException(Status.PRECONDITION_FAILED, 
"ClusterIds should not be null or empty");
-                    }
-                    Set<String> replicationClusters = 
Sets.newHashSet(clusterIds);
                     if (replicationClusters.contains("global")) {
                         throw new RestException(Status.PRECONDITION_FAILED,
                                 "Cannot specify global in the list of 
replication clusters");
@@ -3417,6 +3417,20 @@ public class PersistentTopicsBase extends AdminResource {
                         
futures.add(validateClusterForTenantAsync(namespaceName.getTenant(), 
clusterId));
                     }
                     return FutureUtil.waitForAll(futures);
+                }).thenCompose(__ -> {
+                    // Sync to create partitioned topic on the remote cluster 
if needed.
+                    TopicName topicNameWithoutPartition = 
TopicName.get(topicName.getPartitionedTopicName());
+                    return 
pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                        
.getPartitionedTopicMetadataAsync(topicNameWithoutPartition).thenCompose(topicMetaOp
 -> {
+                            // Skip to create topic if the topic is 
non-partitioned, because the replicator will create
+                            // it automatically.
+                            if (topicMetaOp.isEmpty()) {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                            return FutureUtil.waitForAll(
+                                    
internalCreatePartitionedTopicToReplicatedClustersInBackground(replicationClusters,
+                                    topicMetaOp.get().partitions).values());
+                        });
                 }).thenCompose(__ ->
                     getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op 
-> {
                             TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index f9184f2288f..35073575f34 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -25,10 +25,12 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -59,6 +61,9 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
@@ -92,6 +97,20 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
+    private void waitReplicatorStopped(String topicName) {
+        Awaitility.await().untilAsserted(() -> {
+            Optional<Topic> topicOptional2 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
+            assertTrue(topicOptional2.isPresent());
+            PersistentTopic persistentTopic2 = (PersistentTopic) 
topicOptional2.get();
+            assertTrue(persistentTopic2.getProducers().isEmpty());
+            Optional<Topic> topicOptional1 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
+            assertTrue(topicOptional1.isPresent());
+            PersistentTopic persistentTopic1 = (PersistentTopic) 
topicOptional2.get();
+            assertTrue(persistentTopic1.getReplicators().isEmpty()
+                    || 
!persistentTopic1.getReplicators().get(cluster2).isConnected());
+        });
+    }
+
     /**
      * Override "AbstractReplicator.producer" by {@param producer} and return 
the original value.
      */
@@ -108,7 +127,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
 
     @Test(timeOut = 45 * 1000)
     public void testReplicatorProducerStatInTopic() throws Exception {
-        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
         final String subscribeName = "subscribe_1";
         final byte[] msgValue = "test".getBytes();
 
@@ -134,7 +153,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
 
     @Test(timeOut = 45 * 1000)
     public void testCreateRemoteConsumerFirst() throws Exception {
-        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
         Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
 
         // The topic in cluster2 has a replicator created producer(schema 
Auto_Produce), but does not have any schema。
@@ -154,7 +173,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
 
     @Test(timeOut = 45 * 1000)
     public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
-        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
         admin1.topics().createNonPartitionedTopic(topicName);
         // Wait for replicator started.
         waitReplicatorStarted(topicName);
@@ -210,7 +229,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         BrokerService brokerService = pulsar1.getBrokerService();
         // Wait for the internal client created.
         final String topicNameTriggerInternalClientCreate =
-                BrokerTestUtil.newUniqueName("persistent://" + 
defaultNamespace + "/tp_");
+                BrokerTestUtil.newUniqueName("persistent://" + 
replicatedNamespace + "/tp_");
         
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
         waitReplicatorStarted(topicNameTriggerInternalClientCreate);
         cleanupTopics(() -> {
@@ -338,7 +357,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
      */
     @Test(timeOut = 120 * 1000)
     public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws 
Exception {
-        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
         // Inject an error for "replicator.producer" creation.
         // The delay time of next retry to create producer is below:
         //   0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
@@ -409,4 +428,62 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             admin2.topics().delete(topicName);
         });
     }
+
+    @Test
+    public void testPartitionedTopicLevelReplication() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+        final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+        admin1.topics().createPartitionedTopic(topicName, 2);
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        // Check the partitioned topic has been created at the remote cluster.
+        PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+        assertEquals(topicMetadata2.partitions, 2);
+        // cleanup.
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        waitReplicatorStopped(partition0);
+        waitReplicatorStopped(partition1);
+        admin1.topics().deletePartitionedTopic(topicName);
+        admin2.topics().deletePartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+        final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+        admin1.topics().createPartitionedTopic(topicName, 2);
+        admin2.topics().createPartitionedTopic(topicName, 2);
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        // Check the partitioned topic has been created at the remote cluster.
+        PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+        assertEquals(topicMetadata2.partitions, 2);
+        // cleanup.
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        waitReplicatorStopped(partition0);
+        waitReplicatorStopped(partition1);
+        admin1.topics().deletePartitionedTopic(topicName);
+        admin2.topics().deletePartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        admin2.topics().createPartitionedTopic(topicName, 3);
+        admin1.topics().createPartitionedTopic(topicName, 2);
+        try {
+            admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+            fail("Expected error due to a conflict partitioned topic already 
exists.");
+        } catch (Exception ex) {
+            Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
+            assertTrue(unWrapEx.getMessage().contains("with different 
partitions"));
+        }
+        // Check nothing changed.
+        PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+        assertEquals(topicMetadata2.partitions, 3);
+        assertEquals(admin1.topics().getReplicationClusters(topicName, 
true).size(), 1);
+        // cleanup.
+        admin1.topics().deletePartitionedTopic(topicName);
+        admin2.topics().deletePartitionedTopic(topicName);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 24aec851e19..9e4214897fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -45,7 +45,8 @@ import org.testng.Assert;
 public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
 
     protected final String defaultTenant = "public";
-    protected final String defaultNamespace = defaultTenant + "/default";
+    protected final String replicatedNamespace = defaultTenant + "/default";
+    protected final String nonReplicatedNamespace = defaultTenant + "/ns1";
 
     protected final String cluster1 = "r1";
     protected URL url1;
@@ -142,17 +143,19 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         admin2.tenants().createTenant(defaultTenant, new 
TenantInfoImpl(Collections.emptySet(),
                 Sets.newHashSet(cluster1, cluster2)));
 
-        admin1.namespaces().createNamespace(defaultNamespace, 
Sets.newHashSet(cluster1, cluster2));
-        admin2.namespaces().createNamespace(defaultNamespace);
+        admin1.namespaces().createNamespace(replicatedNamespace, 
Sets.newHashSet(cluster1, cluster2));
+        admin2.namespaces().createNamespace(replicatedNamespace);
+        admin1.namespaces().createNamespace(nonReplicatedNamespace);
+        admin2.namespaces().createNamespace(nonReplicatedNamespace);
     }
 
     protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws 
Exception {
-        waitChangeEventsInit(defaultNamespace);
-        admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Collections.singleton(cluster1));
-        admin1.namespaces().unload(defaultNamespace);
+        waitChangeEventsInit(replicatedNamespace);
+        
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Collections.singleton(cluster1));
+        admin1.namespaces().unload(replicatedNamespace);
         cleanupTopicAction.run();
-        admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(cluster1, cluster2));
-        waitChangeEventsInit(defaultNamespace);
+        
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster1, cluster2));
+        waitChangeEventsInit(replicatedNamespace);
     }
 
     protected void waitChangeEventsInit(String namespace) {
@@ -220,11 +223,13 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     @Override
     protected void cleanup() throws Exception {
         // delete namespaces.
-        waitChangeEventsInit(defaultNamespace);
-        admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(cluster1));
-        admin1.namespaces().deleteNamespace(defaultNamespace);
-        admin2.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(cluster2));
-        admin2.namespaces().deleteNamespace(defaultNamespace);
+        waitChangeEventsInit(replicatedNamespace);
+        
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster1));
+        admin1.namespaces().deleteNamespace(replicatedNamespace);
+        
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster2));
+        admin2.namespaces().deleteNamespace(replicatedNamespace);
+        admin1.namespaces().deleteNamespace(nonReplicatedNamespace);
+        admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
 
         // shutdown.
         markCurrentSetupNumberCleaned();

Reply via email to