This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f38a003  Broker should not start replicator for root partitioned-topic 
(#1262)
f38a003 is described below

commit f38a003a2a8331da94996fc7ea871abfc779ae24
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Feb 22 18:30:58 2018 -0800

    Broker should not start replicator for root partitioned-topic (#1262)
    
    * Broker should not start replicator for root partitioned-topic
    
    * address comment
---
 .../pulsar/broker/service/AbstractReplicator.java  | 51 +++++++++++++--
 .../pulsar/broker/service/BrokerService.java       | 53 +++++++++-------
 .../nonpersistent/NonPersistentReplicator.java     |  3 +-
 .../service/nonpersistent/NonPersistentTopic.java  | 35 +++++++++--
 .../service/persistent/PersistentReplicator.java   |  3 +-
 .../broker/service/persistent/PersistentTopic.java | 39 ++++++++++--
 .../pulsar/broker/service/ReplicatorTest.java      | 72 ++++++++++++++++++++++
 .../pulsar/broker/service/ReplicatorTestBase.java  |  3 +
 8 files changed, 220 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 49213c9..4642a85 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.broker.service.AbstractReplicator.State;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.DestinationName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +61,9 @@ public abstract class AbstractReplicator {
         Stopped, Starting, Started, Stopping
     }
 
-    public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster,
-            String remoteCluster, BrokerService brokerService) {
+    public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster, String remoteCluster,
+            BrokerService brokerService) throws NamingException {
+        validatePartitionedTopic(topicName, brokerService);
         this.brokerService = brokerService;
         this.topicName = topicName;
         this.replicatorPrefix = replicatorPrefix;
@@ -69,8 +74,7 @@ public abstract class AbstractReplicator {
         this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
         this.producerBuilder = client.newProducer() //
-                .topic(topicName)
-                .sendTimeout(0, TimeUnit.SECONDS) //
+                .topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
                 .producerName(getReplicatorName(replicatorPrefix, 
localCluster));
         STATE_UPDATER.set(this, State.Stopped);
@@ -211,5 +215,42 @@ public abstract class AbstractReplicator {
         return (replicatorPrefix + "." + cluster).intern();
     }
 
+    /**
+     * Replication can't be started on root-partitioned-topic to avoid 
producer startup conflict.
+     * 
+     * <pre>
+     * eg:
+     * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic 
with 2 partitions then
+     * broker explicitly creates replicator producer for: 
"my-topic-partition-1" and "my-topic-partition-2".
+     * 
+     * However, if broker tries to start producer with root topic "my-topic" 
then client-lib internally creates individual 
+     * producers for "my-topic-partition-1" and "my-topic-partition-2" which 
creates conflict with existing 
+     * replicator producers.
+     * </pre>
+     * 
+     * Therefore, replicator can't be started on root-partition topic which 
can internally create multiple partitioned
+     * producers.
+     * 
+     * @param topicName
+     * @param brokerService
+     */
+    private void validatePartitionedTopic(String topicName, BrokerService 
brokerService) throws NamingException {
+        DestinationName destination = DestinationName.get(topicName);
+        String partitionedTopicPath = 
path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+                destination.getNamespace().toString(), 
destination.getDomain().toString(),
+                destination.getEncodedLocalName());
+        boolean isPartitionedTopic = false;
+        try {
+            isPartitionedTopic = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(partitionedTopicPath).isPresent();
+        } catch (Exception e) {
+            log.warn("Failed to verify partitioned topic {}-{}", topicName, 
e.getMessage());
+        }
+        if (isPartitionedTopic) {
+            throw new NamingException(
+                    topicName + " is a partitioned-topic and replication can't 
be started for partitioned-producer ");
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(AbstractReplicator.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1eff83b..c12527d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -583,29 +584,37 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                     new OpenLedgerCallback() {
                         @Override
                         public void openLedgerComplete(ManagedLedger ledger, 
Object ctx) {
-                            PersistentTopic persistentTopic = new 
PersistentTopic(topic, ledger, BrokerService.this);
-
-                            CompletableFuture<Void> replicationFuture = 
persistentTopic.checkReplication();
-                            replicationFuture.thenCompose(v -> {
-                                // Also check dedup status
-                                return 
persistentTopic.checkDeduplicationStatus();
-                            }).thenRun(() -> {
-                                log.info("Created topic {} - dedup is {}", 
topic,
-                                        
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
-                                long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
-                                        - topicCreateTimeMs;
-                                pulsarStats.recordTopicLoadTimeValue(topic, 
topicLoadLatencyMs);
-                                addTopicToStatsMaps(destinationName, 
persistentTopic);
-                                topicFuture.complete(persistentTopic);
-                            }).exceptionally((ex) -> {
-                                log.warn("Replication or dedup check failed. 
Removing topic from topics list {}, {}", topic, ex);
-                                
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                                    topics.remove(topic, topicFuture);
-                                    topicFuture.completeExceptionally(ex);
+                            try {
+                                PersistentTopic persistentTopic = new 
PersistentTopic(topic, ledger,
+                                        BrokerService.this);
+                                CompletableFuture<Void> replicationFuture = 
persistentTopic.checkReplication();
+                                replicationFuture.thenCompose(v -> {
+                                    // Also check dedup status
+                                    return 
persistentTopic.checkDeduplicationStatus();
+                                }).thenRun(() -> {
+                                    log.info("Created topic {} - dedup is {}", 
topic,
+                                            
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
+                                    long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
+                                            - topicCreateTimeMs;
+                                    
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                                    addTopicToStatsMaps(destinationName, 
persistentTopic);
+                                    topicFuture.complete(persistentTopic);
+                                }).exceptionally((ex) -> {
+                                    log.warn(
+                                            "Replication or dedup check 
failed. Removing topic from topics list {}, {}",
+                                            topic, ex);
+                                    
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                        topics.remove(topic, topicFuture);
+                                        topicFuture.completeExceptionally(ex);
+                                    });
+
+                                    return null;
                                 });
-
-                                return null;
-                            });
+                            } catch (NamingException e) {
+                                log.warn("Failed to create topic {}-{}", 
topic, e.getMessage());
+                                pulsar.getExecutor().submit(() -> 
topics.remove(topic, topicFuture));
+                                topicFuture.completeExceptionally(e);
+                            }
                         }
 
                         @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index e9219a6..74af8b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.client.api.MessageId;
@@ -49,7 +50,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
     private final NonPersistentReplicatorStats stats = new 
NonPersistentReplicatorStats();
 
     public NonPersistentReplicator(NonPersistentTopic topic, String 
localCluster, String remoteCluster,
-            BrokerService brokerService) {
+            BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.replicatorPrefix, localCluster, 
remoteCluster, brokerService);
 
         producerBuilder.blockIfQueueFull(false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index fdbadc6..5941768 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
@@ -49,6 +50,8 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyE
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Replicator;
@@ -535,7 +538,12 @@ public class NonPersistentTopic implements Topic {
             }
 
             if (!replicators.containsKey(cluster)) {
-                startReplicator(cluster);
+                if (!startReplicator(cluster)) {
+                    // it happens when global topic is a partitioned topic and 
replicator can't start on original
+                    // non partitioned-topic (topic without partition prefix)
+                    return FutureUtil
+                            .failedFuture(new NamingException(topic + " failed 
to start replicator for " + cluster));
+                }
             }
         }
 
@@ -550,13 +558,30 @@ public class NonPersistentTopic implements Topic {
         return FutureUtil.waitForAll(futures);
     }
 
-    void startReplicator(String remoteCluster) {
+    boolean startReplicator(String remoteCluster) {
         log.info("[{}] Starting replicator to remote: {}", topic, 
remoteCluster);
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
-        replicators.computeIfAbsent(remoteCluster,
-                r -> new NonPersistentReplicator(NonPersistentTopic.this, 
localCluster, remoteCluster, brokerService));
+        return addReplicationCluster(remoteCluster,NonPersistentTopic.this, 
localCluster);
     }
 
+    protected boolean addReplicationCluster(String remoteCluster, 
NonPersistentTopic nonPersistentTopic, String localCluster) {
+        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+        replicators.computeIfAbsent(remoteCluster, r -> {
+            try {
+                return new NonPersistentReplicator(NonPersistentTopic.this, 
localCluster, remoteCluster, brokerService);
+            } catch (NamingException e) {
+                isReplicatorStarted.set(false);
+                log.error("[{}] Replicator startup failed due to 
partitioned-topic {}", topic, remoteCluster);
+            }
+            return null;
+        });
+        // clean up replicator if startup is failed
+        if (!isReplicatorStarted.get()) {
+            replicators.remove(remoteCluster);
+        }
+        return isReplicatorStarted.get();
+    }
+    
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -941,5 +966,7 @@ public class NonPersistentTopic implements Topic {
         this.hasBatchMessagePublished = true;
     }
 
+    
+    
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 6211819..52ed4b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.Backoff;
@@ -89,7 +90,7 @@ public class PersistentReplicator extends AbstractReplicator 
implements Replicat
     private final ReplicatorStats stats = new ReplicatorStats();
 
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, 
String localCluster, String remoteCluster,
-            BrokerService brokerService) {
+            BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.replicatorPrefix, localCluster, 
remoteCluster, brokerService);
         this.topic = topic;
         this.cursor = cursor;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0ce3925..88813ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -194,7 +194,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         }
     }
 
-    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService 
brokerService) {
+    public PersistentTopic(String topic, ManagedLedger ledger, BrokerService 
brokerService) throws NamingException {
         this.topic = topic;
         this.ledger = ledger;
         this.brokerService = brokerService;
@@ -213,8 +213,11 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
                 String remoteCluster = 
PersistentReplicator.getRemoteCluster(cursor.getName());
-                replicators.put(remoteCluster,
-                        new PersistentReplicator(this, cursor, localCluster, 
remoteCluster, brokerService));
+                boolean isReplicatorStarted = 
addReplicationCluster(remoteCluster, this, cursor, localCluster);
+                if (!isReplicatorStarted) {
+                    throw new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start 
replicator " + remoteCluster);
+                }
             } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
                 // This is not a regular subscription, we are going to ignore 
it for now and let the message dedup logic
                 // to take care of it
@@ -896,9 +899,13 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
-                replicators.computeIfAbsent(remoteCluster, r -> new 
PersistentReplicator(PersistentTopic.this, cursor, localCluster,
-                        remoteCluster, brokerService));
-                future.complete(null);
+                boolean isReplicatorStarted = 
addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, 
localCluster);
+                if (isReplicatorStarted) {
+                    future.complete(null);    
+                } else {
+                    future.completeExceptionally(new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start 
replicator " + remoteCluster));
+                }
             }
 
             @Override
@@ -911,6 +918,26 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         return future;
     }
 
+    protected boolean addReplicationCluster(String remoteCluster, 
PersistentTopic persistentTopic, ManagedCursor cursor,
+            String localCluster) {
+        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
+        replicators.computeIfAbsent(remoteCluster, r -> {
+            try {
+                return new PersistentReplicator(PersistentTopic.this, cursor, 
localCluster, remoteCluster,
+                        brokerService);
+            } catch (NamingException e) {
+                isReplicatorStarted.set(false);
+                log.error("[{}] Replicator startup failed due to 
partitioned-topic {}", topic, remoteCluster);
+            }
+            return null;
+        });
+        // clean up replicator if startup is failed
+        if (!isReplicatorStarted.get()) {
+            replicators.remove(remoteCluster);
+        }
+        return isReplicatorStarted.get();
+    }
+
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index cbef1d0..922f6ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.checksum.utils.Crc32cChecksum;
@@ -74,6 +75,8 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
@@ -84,6 +87,14 @@ import io.netty.buffer.ByteBuf;
  */
 public class ReplicatorTest extends ReplicatorTestBase {
 
+    protected String methodName;
+
+    @BeforeMethod
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+    }
+
+    
     @Override
     @BeforeClass(timeOut = 30000)
     void setup() throws Exception {
@@ -102,6 +113,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
         });
     }
 
+    @DataProvider(name = "partitionedTopic")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @Test(enabled = true, timeOut = 30000)
     public void testConfigChange() throws Exception {
         log.info("--- Starting ReplicatorTest::testConfigChange ---");
@@ -856,6 +872,62 @@ public class ReplicatorTest extends ReplicatorTestBase {
         reader2.closeAsync().get();
     }
 
+    /**
+     * It verifies that broker should not start replicator for 
partitioned-topic (topic without -partition postfix)
+     * 
+     * @param isPartitionedTopic
+     * @throws Exception
+     */
+    @Test(dataProvider = "partitionedTopic")
+    public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) 
throws Exception {
+
+        log.info("--- Starting ReplicatorTest::{} --- ", methodName);
+
+        final String namespace = "pulsar/global/partitionedNs-" + 
isPartitionedTopic;
+        final String persistentTopicName = "persistent://" + namespace + 
"/partTopic-" + isPartitionedTopic;
+        final String nonPersistentTopicName = "non-persistent://" + namespace 
+ "/partTopic-" + isPartitionedTopic;
+        BrokerService brokerService = pulsar1.getBrokerService();
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Lists.newArrayList("r1", "r2", "r3"));
+
+        if (isPartitionedTopic) {
+            
admin1.persistentTopics().createPartitionedTopic(persistentTopicName, 5);
+            
admin1.nonPersistentTopics().createPartitionedTopic(nonPersistentTopicName, 5);
+        }
+
+        // load namespace with dummy topic on ns
+        PulsarClient client = PulsarClient.create(url1.toString());
+        client.createProducer("persistent://" + namespace + "/dummyTopic");
+
+        // persistent topic test
+        try {
+            brokerService.getTopic(persistentTopicName).get();
+            if (isPartitionedTopic) {
+                fail("Topic creation fails with partitioned topic as 
replicator init fails");
+            }
+        } catch (Exception e) {
+            if (!isPartitionedTopic) {
+                fail("Topic creation should not fail without any partitioned 
topic");
+            }
+            assertTrue(e.getCause() instanceof NamingException);
+        }
+
+        // non-persistent topic test
+        try {
+            brokerService.getTopic(nonPersistentTopicName).get();
+            if (isPartitionedTopic) {
+                fail("Topic creation fails with partitioned topic as 
replicator init fails");
+            }
+        } catch (Exception e) {
+            if (!isPartitionedTopic) {
+                fail("Topic creation should not fail without any partitioned 
topic");
+            }
+            assertTrue(e.getCause() instanceof NamingException);
+        }
+
+    }
+    
     private static final Logger log = 
LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 344ad23..4320428 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -132,6 +132,7 @@ public class ReplicatorTestBase {
         config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config1.setDefaultNumberOfNamespaceBundles(1);
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
@@ -165,6 +166,7 @@ public class ReplicatorTestBase {
         config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config2.setDefaultNumberOfNamespaceBundles(1);
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
@@ -197,6 +199,7 @@ public class ReplicatorTestBase {
         config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config3.setDefaultNumberOfNamespaceBundles(1);
         pulsar3 = new PulsarService(config3);
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();

-- 
To stop receiving notification emails like this one, please contact
rdhaba...@apache.org.

Reply via email to