saandrews commented on a change in pull request #538: Make broker configurable
to own non-persistent topic
URL: https://github.com/apache/incubator-pulsar/pull/538#discussion_r127076145
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
##########
@@ -553,6 +568,155 @@ public void testReplicator() throws Exception {
}
+ /**
+ * verifies load manager assigns topic only if broker started in
non-persistent mode
+ * <pre>
+ * 1. Start broker with disable non-persistent topic mode
+ * 2. Create namespace with non-persistency set
+ * 3. Create non-persistent topic
+ * 4. Load-manager should not be able to find broker
+ * 5. Create producer on that topic should fail
+ * </pre>
+ */
+ @Test(dataProvider = "loadManager")
+ public void testLoadManagerAssignmentForNonPersistentTestAssignment(String
loadManagerName) throws Exception {
+
+ final String namespace = "my-property/use/my-ns";
+ final String topicName = "non-persistent://" + namespace +
"/loadManager";
+ final String defaultLoadManagerName = conf.getLoadManagerClassName();
+ final boolean defaultENableNonPersistentTopic =
conf.isEnableNonPersistentTopics();
+ try {
+ // start broker to not own non-persistent namespace and create
non-persistent namespace
+ stopBroker();
+ conf.setEnableNonPersistentTopics(false);
+ conf.setLoadManagerClassName(loadManagerName);
+ startBroker();
+
+ Field field = PulsarService.class.getDeclaredField("loadManager");
+ field.setAccessible(true);
+ AtomicReference<LoadManager> loadManagerRef =
(AtomicReference<LoadManager>) field.get(pulsar);
+ LoadManager manager = LoadManager.create(pulsar);
+ manager.start();
+ loadManagerRef.set(manager);
+
+ NamespaceBundle fdqn =
pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+ LoadManager loadManager = pulsar.getLoadManager().get();
+ ResourceUnit broker = null;
+ try {
+ broker = loadManager.getLeastLoaded(fdqn);
+ } catch (Exception e) {
+ // Ok. (ModulearLoadManagerImpl throws RuntimeException incase
don't find broker)
+ }
+ assertNull(broker);
+
+ ProducerConfiguration producerConf = new ProducerConfiguration();
+ try {
+ Producer producer =
pulsarClient.createProducerAsync(topicName, producerConf).get(1,
TimeUnit.SECONDS);
+ producer.close();
+ fail("topic loading should have failed");
+ } catch (Exception e) {
+ // Ok
+ }
+ NonPersistentTopic topicRef = (NonPersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName);
+ assertNull(topicRef);
+
+ } finally {
+ conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+ conf.setLoadManagerClassName(defaultLoadManagerName);
+ }
+
+ }
+
+ /**
+ * verifies: broker should reject non-persistent topic loading if broker
is not enable for non-persistent topic
+ *
+ * @param loadManagerName
+ * @throws Exception
+ */
+ @Test
+ public void testNonPersistentTopicUnderPersistentNamespace() throws
Exception {
+
+ final String namespace = "my-property/use/my-ns";
+ final String topicName = "non-persistent://" + namespace +
"/persitentNamespace";
+
+ final boolean defaultENableNonPersistentTopic =
conf.isEnableNonPersistentTopics();
+ try {
+ conf.setEnableNonPersistentTopics(false);
+ stopBroker();
+ startBroker();
+ ProducerConfiguration producerConf = new ProducerConfiguration();
+ try {
+ Producer producer =
pulsarClient.createProducerAsync(topicName, producerConf).get(1,
TimeUnit.SECONDS);
+ producer.close();
+ fail("topic loading should have failed");
+ } catch (Exception e) {
+ // Ok
+ }
+ NonPersistentTopic topicRef = (NonPersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName);
+ assertNull(topicRef);
+ } finally {
+ conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+ }
+ }
+
+ /**
+ * verifies that broker started with onlyNonPersistent mode doesn't own
persistent-topic
+ *
+ * @param loadManagerName
+ * @throws Exception
+ */
+ @Test(dataProvider = "loadManager")
+ public void testNonPersistentBrokerModeRejectPersistentTopic(String
loadManagerName) throws Exception {
+
+ final String namespace = "my-property/use/my-ns";
+ final String topicName = "persistent://" + namespace + "/loadManager";
+ final String defaultLoadManagerName = conf.getLoadManagerClassName();
+ final boolean defaultEnablePersistentTopic =
conf.isEnablePersistentTopics();
+ final boolean defaultEnableNonPersistentTopic =
conf.isEnableNonPersistentTopics();
+ try {
+ // start broker to not own non-persistent namespace and create
non-persistent namespace
+ stopBroker();
+ conf.setEnableNonPersistentTopics(true);
+ conf.setEnablePersistentTopics(false);
+ conf.setLoadManagerClassName(loadManagerName);
+ startBroker();
+
+ Field field = PulsarService.class.getDeclaredField("loadManager");
+ field.setAccessible(true);
+ AtomicReference<LoadManager> loadManagerRef =
(AtomicReference<LoadManager>) field.get(pulsar);
+ LoadManager manager = LoadManager.create(pulsar);
+ manager.start();
+ loadManagerRef.set(manager);
+
+ NamespaceBundle fdqn =
pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+ LoadManager loadManager = pulsar.getLoadManager().get();
+ ResourceUnit broker = null;
+ try {
+ broker = loadManager.getLeastLoaded(fdqn);
+ } catch (Exception e) {
+ // Ok. (ModulearLoadManagerImpl throws RuntimeException incase
don't find broker)
+ }
+ assertNull(broker);
+
+ ProducerConfiguration producerConf = new ProducerConfiguration();
+ try {
+ Producer producer =
pulsarClient.createProducerAsync(topicName, producerConf).get(1,
TimeUnit.SECONDS);
+ producer.close();
+ fail("topic loading should have failed");
+ } catch (Exception e) {
+ // Ok
+ }
Review comment:
Shouldn't it attempt to create persistent topic and ensure it is ok?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services