saandrews commented on a change in pull request #538: Make broker configurable 
to own non-persistent topic
URL: https://github.com/apache/incubator-pulsar/pull/538#discussion_r127076550
 
 

 ##########
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 ##########
 @@ -553,6 +568,155 @@ public void testReplicator() throws Exception {
 
     }
 
+    /**
+     * verifies load manager assigns topic only if broker started in 
non-persistent mode
+     * <pre>
+     * 1. Start broker with disable non-persistent topic mode
+     * 2. Create namespace with non-persistency set
+     * 3. Create non-persistent topic
+     * 4. Load-manager should not be able to find broker
+     * 5. Create producer on that topic should fail
+     * </pre>
+     */
+    @Test(dataProvider = "loadManager")
+    public void testLoadManagerAssignmentForNonPersistentTestAssignment(String 
loadManagerName) throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final String topicName = "non-persistent://" + namespace + 
"/loadManager";
+        final String defaultLoadManagerName = conf.getLoadManagerClassName();
+        final boolean defaultENableNonPersistentTopic = 
conf.isEnableNonPersistentTopics();
+        try {
+            // start broker to not own non-persistent namespace and create 
non-persistent namespace
+            stopBroker();
+            conf.setEnableNonPersistentTopics(false);
+            conf.setLoadManagerClassName(loadManagerName);
+            startBroker();
+
+            Field field = PulsarService.class.getDeclaredField("loadManager");
+            field.setAccessible(true);
+            AtomicReference<LoadManager> loadManagerRef = 
(AtomicReference<LoadManager>) field.get(pulsar);
+            LoadManager manager = LoadManager.create(pulsar);
+            manager.start();
+            loadManagerRef.set(manager);
+
+            NamespaceBundle fdqn = 
pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+            LoadManager loadManager = pulsar.getLoadManager().get();
+            ResourceUnit broker = null;
+            try {
+                broker = loadManager.getLeastLoaded(fdqn);
+            } catch (Exception e) {
+                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase 
don't find broker)
+            }
+            assertNull(broker);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+            try {
+                Producer producer = 
pulsarClient.createProducerAsync(topicName, producerConf).get(1, 
TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+            NonPersistentTopic topicRef = (NonPersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName);
+            assertNull(topicRef);
+
+        } finally {
+            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+            conf.setLoadManagerClassName(defaultLoadManagerName);
+        }
+
+    }
+
+    /**
+     * verifies: broker should reject non-persistent topic loading if broker 
is not enable for non-persistent topic
+     * 
+     * @param loadManagerName
+     * @throws Exception
+     */
+    @Test
+    public void testNonPersistentTopicUnderPersistentNamespace() throws 
Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final String topicName = "non-persistent://" + namespace + 
"/persitentNamespace";
+
+        final boolean defaultENableNonPersistentTopic = 
conf.isEnableNonPersistentTopics();
+        try {
+            conf.setEnableNonPersistentTopics(false);
+            stopBroker();
+            startBroker();
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+            try {
+                Producer producer = 
pulsarClient.createProducerAsync(topicName, producerConf).get(1, 
TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+            NonPersistentTopic topicRef = (NonPersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName);
+            assertNull(topicRef);
+        } finally {
+            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+        }
+    }
+
+    /**
+     * verifies that broker started with onlyNonPersistent mode doesn't own 
persistent-topic
+     * 
+     * @param loadManagerName
+     * @throws Exception
+     */
+    @Test(dataProvider = "loadManager")
+    public void testNonPersistentBrokerModeRejectPersistentTopic(String 
loadManagerName) throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final String topicName = "persistent://" + namespace + "/loadManager";
+        final String defaultLoadManagerName = conf.getLoadManagerClassName();
+        final boolean defaultEnablePersistentTopic = 
conf.isEnablePersistentTopics();
+        final boolean defaultEnableNonPersistentTopic = 
conf.isEnableNonPersistentTopics();
+        try {
+            // start broker to not own non-persistent namespace and create 
non-persistent namespace
+            stopBroker();
+            conf.setEnableNonPersistentTopics(true);
+            conf.setEnablePersistentTopics(false);
+            conf.setLoadManagerClassName(loadManagerName);
+            startBroker();
+
+            Field field = PulsarService.class.getDeclaredField("loadManager");
+            field.setAccessible(true);
+            AtomicReference<LoadManager> loadManagerRef = 
(AtomicReference<LoadManager>) field.get(pulsar);
+            LoadManager manager = LoadManager.create(pulsar);
+            manager.start();
+            loadManagerRef.set(manager);
+
+            NamespaceBundle fdqn = 
pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+            LoadManager loadManager = pulsar.getLoadManager().get();
+            ResourceUnit broker = null;
+            try {
+                broker = loadManager.getLeastLoaded(fdqn);
+            } catch (Exception e) {
+                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase 
don't find broker)
+            }
+            assertNull(broker);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+            try {
+                Producer producer = 
pulsarClient.createProducerAsync(topicName, producerConf).get(1, 
TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+            NonPersistentTopic topicRef = (NonPersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName);
+            assertNull(topicRef);
+
+        } finally {
+            conf.setEnablePersistentTopics(defaultEnablePersistentTopic);
+            conf.setEnableNonPersistentTopics(defaultEnableNonPersistentTopic);
+            conf.setLoadManagerClassName(defaultLoadManagerName);
+        }
+
+    }
+    
 
 Review comment:
   Do we have a case where both type is enabled and topic creation succeeds?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to