eolivelli commented on a change in pull request #12235:
URL: https://github.com/apache/pulsar/pull/12235#discussion_r780824370



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
##########
@@ -1859,4 +1870,47 @@ public void testProducerBusy() throws Exception {
 
         
assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
     }
+
+    @Test(dataProvider = "topic")
+    public void testPersistentTopicFactory(boolean isPersistent) throws 
Exception {
+        conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+        restartBroker();
+
+        final String topicName = (isPersistent ? "persistent" : 
"non-persistent") + "://prop/ns-abc/factoryTopic"
+                + isPersistent;
+        MyTopicFactory.count.set(0);
+
+        // 1. producer connect
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
+
+        assertTrue(MyTopicFactory.count.get() > 0);
+        producer.close();
+        consumer.close();
+    }
+
+    public static class MyTopicFactory implements TopicFactory {
+        private static AtomicInteger count = new AtomicInteger(0);
+
+        @Override
+        public <T extends Topic> T create(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+                Class<T> topicClazz) {
+            try {
+                count.incrementAndGet();
+                if(topicClazz == NonPersistentTopic.class) {
+                    return (T) new NonPersistentTopic(topic, brokerService);
+                }else {
+                    return (T) new PersistentTopic(topic, ledger, 
brokerService); 
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No-op

Review comment:
       Please verify the number of calls to this method

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -2809,6 +2817,35 @@ public long getPausedConnections() {
         return pausedConnections.longValue();
     }
 
+    @SuppressWarnings("unchecked")
+    private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+            Class<T> topicClazz) {
+        if (topicFactory != null) {
+            try {
+                Topic newTopic = topicFactory.create(topic, ledger, 
brokerService, topicClazz);
+                if (newTopic != null) {
+                    return (T) newTopic;
+                }
+            } catch (Throwable e) {
+                log.warn("Failed to create persistent topic using factory {}, 
{}", topic, e.getMessage());

Review comment:
       Here we should fail otherwise the system will be using the default 
implementation, leading to unpredictable behaviour 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -327,6 +328,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-backlog-quota-checker"));
         this.authenticationService = new 
AuthenticationService(pulsar.getConfiguration());
         this.blockedDispatchers = new ConcurrentOpenHashSet<>();
+        this.topicFactory = createPersistentTopicFactory();

Review comment:
       We are not shutting down (calling 'close')  the new instance 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -2809,6 +2817,35 @@ public long getPausedConnections() {
         return pausedConnections.longValue();
     }
 
+    @SuppressWarnings("unchecked")
+    private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+            Class<T> topicClazz) {
+        if (topicFactory != null) {
+            try {
+                Topic newTopic = topicFactory.create(topic, ledger, 
brokerService, topicClazz);
+                if (newTopic != null) {
+                    return (T) newTopic;
+                }
+            } catch (Throwable e) {
+                log.warn("Failed to create persistent topic using factory {}, 
{}", topic, e.getMessage());
+            }
+        }
+        return topicClazz == NonPersistentTopic.class ? (T) new 
NonPersistentTopic(topic, BrokerService.this)
+                : (T) new PersistentTopic(topic, ledger, brokerService);
+    }
+
+    private TopicFactory createPersistentTopicFactory() {
+        String topicFactoryClassName = 
pulsar.getConfig().getTopicFactoryClassName();
+        if (StringUtils.isNotBlank(topicFactoryClassName)) {
+            try {
+                return (TopicFactory) 
Class.forName(topicFactoryClassName).newInstance();
+            } catch (Exception e) {
+                log.warn("Failed to initialize topic factory class {}", 
topicFactoryClassName, e);

Review comment:
       We should fail, otherwise the system will use the default configuration, 
leading to unexpected behaviour 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to